Commit ae5d62d2 authored by Jason Madden's avatar Jason Madden

Initial implementation of gevent.selectors.GeventSelector.

Fixes #1532, but there's potentially room for optimization. Committing the simple version for CI tests.
parent f0e3bb96
=======================================================
:mod:`gevent.selectors` -- High-level IO Multiplexing
=======================================================
.. automodule:: gevent.selectors
:members:
Add ``gevent.selectors`` containing ``GeventSelector``.
This is monkey-patched as ``selectors.DefaultSelector`` by default.
This is available on Python 2 if the ``selectors2`` backport is
installed. (This backport is installed automatically using the
``recommended`` extra.) When monkey-patching, ``selectors`` is made
available as an alias to this module.
......@@ -315,6 +315,8 @@ EXTRA_MONITOR = [
EXTRA_RECOMMENDED = [
# We need this at runtime to use the libev-CFFI and libuv backends
CFFI_DEP,
# Backport of selectors module to Python 2
'selectors2 ; python_version == "2.7"',
] + EXTRA_DNSPYTHON + EXTRA_EVENTS + EXTRA_MONITOR
......
......@@ -25,6 +25,7 @@ MAPPING = {
'gevent.local': '_threading_local',
'gevent.socket': 'socket',
'gevent.select': 'select',
'gevent.selectors': 'selectors' if PY3 else 'selectors2',
'gevent.ssl': 'ssl',
'gevent.thread': '_thread' if PY3 else 'thread',
'gevent.subprocess': 'subprocess',
......
......@@ -415,6 +415,20 @@ def patch_module(target_module, source_module, items=None,
return True
def _check_availability(name):
"""
Test that the source and target modules for *name* are
available and return them.
:raise ImportError: If the source or target cannot be imported.
:return: The tuple ``(gevent_module, target_module, target_module_name)``
"""
gevent_module = getattr(__import__('gevent.' + name), name)
target_module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(target_module_name)
return gevent_module, target_module, target_module_name
def _patch_module(name,
items=None,
_warnings=None,
......@@ -423,9 +437,7 @@ def _patch_module(name,
_notify_did_subscribers=True,
_call_hooks=True):
gevent_module = getattr(__import__('gevent.' + name), name)
module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(module_name)
gevent_module, target_module, target_module_name = _check_availability(name)
patch_module(target_module, gevent_module, items=items,
_warnings=_warnings, _patch_kwargs=_patch_kwargs,
......@@ -451,7 +463,7 @@ def _patch_module(name,
_notify_will_subscribers=False,
_notify_did_subscribers=False,
_call_hooks=False)
saved[alternate_name] = saved[module_name]
saved[alternate_name] = saved[target_module_name]
return gevent_module, target_module
......@@ -1013,20 +1025,48 @@ def patch_select(aggressive=True):
and :func:`select.poll` with :class:`gevent.select.poll` (where available).
If ``aggressive`` is true (the default), also remove other
blocking functions from :mod:`select` and (on Python 3.4 and
above) :mod:`selectors`:
blocking functions from :mod:`select` .
- :func:`select.epoll`
- :func:`select.kqueue`
- :func:`select.kevent`
- :func:`select.devpoll` (Python 3.5+)
"""
_patch_module('select',
_patch_kwargs={'aggressive': aggressive})
@_ignores_DoNotPatch
def patch_selectors(aggressive=True):
"""
Replace :class:`selectors.DefaultSelector` with
:class:`gevent.selectors.GeventSelector`.
If ``aggressive`` is true (the default), also remove other
blocking classes :mod:`selectors`:
- :class:`selectors.EpollSelector`
- :class:`selectors.KqueueSelector`
- :class:`selectors.DevpollSelector` (Python 3.5+)
On Python 2, the :mod:`selectors2` module is used instead
of :mod:`selectors` if it is available. If this module cannot
be imported, no patching is done and :mod:`gevent.selectors` is
not available.
In :func:`patch_all`, the *select* argument controls both this function
and :func:`patch_select`.
.. versionadded:: NEXT
"""
_patch_module('select',
try:
_check_availability('selectors')
except ImportError: # pragma: no cover
return
_patch_module('selectors',
_patch_kwargs={'aggressive': aggressive})
@_ignores_DoNotPatch
def patch_subprocess():
"""
......@@ -1178,6 +1218,7 @@ def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=Tru
patch_socket(dns=dns, aggressive=aggressive)
if select:
patch_select(aggressive=aggressive)
patch_selectors(aggressive=aggressive)
if ssl:
patch_ssl(_warnings=_warnings, _first_time=first_time)
if subprocess:
......
......@@ -258,6 +258,25 @@ class poll(object):
def modify(self, fd, eventmask):
self.register(fd, eventmask)
def _get_started_watchers(self, watcher_cb):
watchers = []
io = self.loop.io
MAXPRI = self.loop.MAXPRI
try:
for fd, flags in iteritems(self.fds):
watcher = io(fd, flags)
watchers.append(watcher)
watcher.priority = MAXPRI
watcher.start(watcher_cb, fd, pass_events=True)
except:
for awatcher in watchers:
awatcher.stop()
awatcher.close()
raise
return watchers
def poll(self, timeout=None):
"""
poll the registered fds.
......@@ -270,15 +289,8 @@ class poll(object):
i.e., block. This was always the case with libev.
"""
result = PollResult()
watchers = []
io = self.loop.io
MAXPRI = self.loop.MAXPRI
watchers = self._get_started_watchers(result.add_event)
try:
for fd, flags in iteritems(self.fds):
watcher = io(fd, flags)
watchers.append(watcher)
watcher.priority = MAXPRI
watcher.start(result.add_event, fd, pass_events=True)
if timeout is not None:
if timeout < 0:
# The docs for python say that an omitted timeout,
......@@ -320,7 +332,6 @@ class poll(object):
def _gevent_do_monkey_patch(patch_request):
aggressive = patch_request.patch_kwargs['aggressive']
target_mod = patch_request.target_module
patch_request.default_patch_items()
......@@ -334,55 +345,3 @@ def _gevent_do_monkey_patch(patch_request):
'kevent',
'devpoll',
)
if patch_request.PY3:
# TODO: Do we need to broadcast events about patching the selectors
# package? If so, must be careful to deal with DoNotPatch exceptions.
# Python 3 wants to use `select.select` as a member function,
# leading to this error in selectors.py (because
# gevent.select.select is not a builtin and doesn't get the
# magic auto-static that they do):
#
# r, w, _ = self._select(self._readers, self._writers, [], timeout)
# TypeError: select() takes from 3 to 4 positional arguments but 5 were given
#
# Note that this obviously only happens if selectors was
# imported after we had patched select; but there is a code
# path that leads to it being imported first (but now we've
# patched select---so we can't compare them identically). It also doesn't
# happen on Windows, because they define a normal method for _select, to work around
# some weirdness in the handling of the third argument.
orig_select_select = patch_request.get_original('select', 'select')
assert target_mod.select is not orig_select_select
selectors = __import__('selectors')
if selectors.SelectSelector._select in (target_mod.select, orig_select_select):
def _select(self, *args, **kwargs): # pylint:disable=unused-argument
return select(*args, **kwargs)
selectors.SelectSelector._select = _select
_select._gevent_monkey = True # prove for test cases
# Python 3.7 refactors the poll-like selectors to use a common
# base class and capture a reference to select.poll, etc, at
# import time. selectors tends to get imported early
# (importing 'platform' does it: platform -> subprocess -> selectors),
# so we need to clean that up.
if hasattr(selectors, 'PollSelector') and hasattr(selectors.PollSelector, '_selector_cls'):
selectors.PollSelector._selector_cls = poll
if aggressive:
# If `selectors` had already been imported before we removed
# select.epoll|kqueue|devpoll, these may have been defined in terms
# of those functions. They'll fail at runtime.
patch_request.remove_item(
selectors,
'EpollSelector',
'KqueueSelector',
'DevpollSelector',
)
selectors.DefaultSelector = getattr(
selectors,
'PollSelector',
selectors.SelectSelector
)
# Copyright (c) 2020 gevent contributors.
"""
This module provides :class:`GeventSelector`, a high-level IO
multiplexing mechanism. This is aliased to :class:`DefaultSelector`.
This module provides the same API as the selectors defined in :mod:`selectors`.
On Python 2, this module is only available if the `selectors2
<https://pypi.org/project/selectors2/>`_ backport is installed.
.. versionadded:: NEXT
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import math
try:
import selectors as __selectors__
except ImportError:
# Probably on Python 2. Do we have the backport?
import selectors2 as __selectors__
__target__ = 'selectors2'
from gevent._compat import iteritems
from gevent._util import copy_globals
from gevent.select import poll as Poll
from gevent.select import POLLIN
from gevent.select import POLLOUT
__implements__ = [
'DefaultSelector',
]
__extra__ = [
'GeventSelector',
]
__all__ = __implements__ + __extra__
__imports__ = copy_globals(
__selectors__, globals(),
names_to_ignore=__all__,
# Copy __all__; __all__ is defined by selectors2 but not Python 3.
dunder_names_to_keep=('__all__',)
)
_POLL_ALL = POLLIN | POLLOUT
EVENT_READ = __selectors__.EVENT_READ
EVENT_WRITE = __selectors__.EVENT_WRITE
_ALL_EVENTS = EVENT_READ | EVENT_WRITE
SelectorKey = __selectors__.SelectorKey
# In 3.4 and selectors2, BaseSelector is a concrete
# class that can be called. In 3.5 and later, it's an
# ABC, with the real implementation being
# passed to _BaseSelectorImpl.
_BaseSelectorImpl = getattr(
__selectors__,
'_BaseSelectorImpl',
__selectors__.BaseSelector
)
class GeventSelector(_BaseSelectorImpl):
"""
A selector implementation using gevent primitives.
"""
def __init__(self):
self._poll = Poll()
self._poll._get_started_watchers = self._get_started_watchers
# {fd: watcher}
self._watchers = {}
super(GeventSelector, self).__init__()
def _get_started_watchers(self, watcher_cb):
for fd, watcher in iteritems(self._watchers):
watcher.start(watcher_cb, fd, pass_events=True)
return list(self._watchers.values())
@property
def loop(self):
return self._poll.loop
def register(self, fileobj, events, data=None):
key = _BaseSelectorImpl.register(self, fileobj, events, data)
if events == _ALL_EVENTS:
flags = _POLL_ALL
elif events == EVENT_READ:
flags = POLLIN
else:
flags = POLLOUT
self._poll.register(key.fd, flags)
loop = self.loop
io = loop.io
MAXPRI = loop.MAXPRI
self._watchers[key.fd] = watcher = io(key.fd, self._poll.fds[key.fd])
watcher.priority = MAXPRI
return key
def unregister(self, fileobj):
key = _BaseSelectorImpl.unregister(self, fileobj)
self._poll.unregister(key.fd)
self._watchers.pop(key.fd)
return key
# XXX: Can we implement ``modify`` more efficiently than
# ``unregister()``+``register()``? We could detect the no-change
# case and do nothing; recent versions of the standard library
# do that.
def select(self, timeout=None):
# In https://github.com/gevent/gevent/pull/1523/, it was
# proposed to (essentially) keep the watchers started even
# after the select() call returned *if* the watcher hadn't fired.
# (If it fired, it was stopped). Watchers were started as soon as they
# were registered.
#
# The goal was to minimize the amount of time spent adjusting the
# underlying kernel (epoll) data structures as watchers are started and
# stopped. Events were just collected continually in the background
# in the hopes that they would be retrieved by a future call to
# ```select()``. This method used an ``Event`` to communicate with
# the background ongoing collection of results.
#
# That becomes a problem if the file descriptor is closed while the watcher
# is still active. Certain backends will crash in that case.
# However, the selectors documentation says that files must be
# unregistered before closing, so that's theoretically not a concern
# here.
#
# Also, stopping the watchers if they fired here was said to be
# because "if we did not, someone could call, e.g., gevent.time.sleep and
# any unconsumed bytes on our watched fd would prevent the process from
# sleeping correctly." It's not clear to me (JAM) why that would be the case
# only in the ``select`` method, and not after the watcher was started in
# ``register()``. Actually, it's not clear why it would be a problem at any
# point.
# timeout > 0 : block seconds
# timeout <= 0 : No blocking.
# timeout = None: Block forever
#
# Meanwhile, for poll():
# timeout None: block forever
# timeout omitted: block forever
# timeout < 0: block forever
# timeout anything else: block that long in *milliseconds*
if timeout is not None:
if timeout <= 0:
# Asked not to block.
timeout = 0
else:
# Convert seconds to ms.
# poll() has a resolution of 1 millisecond, round away from
# zero to wait *at least* timeout seconds.
timeout = math.ceil(timeout * 1e3)
poll_events = self._poll.poll(timeout)
result = []
for fd, event in poll_events:
key = self._key_from_fd(fd)
if not key:
continue
events = 0
if event & POLLOUT:
events |= EVENT_WRITE
if event & POLLIN:
events |= EVENT_READ
result.append((key, events & key.events))
return result
def close(self):
self._poll = None # Nothing to do, just drop it
for watcher in self._watchers.values() if self._watchers else ():
watcher.stop()
watcher.close()
self._watchers = None
_BaseSelectorImpl.close(self)
DefaultSelector = GeventSelector
def _gevent_do_monkey_patch(patch_request):
aggressive = patch_request.patch_kwargs['aggressive']
target_mod = patch_request.target_module
patch_request.default_patch_items()
import sys
if 'selectors' not in sys.modules:
# Py2: Make 'import selectors' work
sys.modules['selectors'] = sys.modules[__name__]
# Python 3 wants to use `select.select` as a member function,
# leading to this error in selectors.py (because
# gevent.select.select is not a builtin and doesn't get the
# magic auto-static that they do):
#
# r, w, _ = self._select(self._readers, self._writers, [], timeout)
# TypeError: select() takes from 3 to 4 positional arguments but 5 were given
#
# Note that this obviously only happens if selectors was
# imported after we had patched select; but there is a code
# path that leads to it being imported first (but now we've
# patched select---so we can't compare them identically). It also doesn't
# happen on Windows, because they define a normal method for _select, to work around
# some weirdness in the handling of the third argument.
#
# The backport doesn't have that.
orig_select_select = patch_request.get_original('select', 'select')
assert target_mod.select is not orig_select_select
selectors = __selectors__
SelectSelector = selectors.SelectSelector
if hasattr(SelectSelector, '_select') and SelectSelector._select in (
target_mod.select, orig_select_select
):
from gevent.select import select
def _select(self, *args, **kwargs): # pylint:disable=unused-argument
return select(*args, **kwargs)
selectors.SelectSelector._select = _select
_select._gevent_monkey = True # prove for test cases
if aggressive:
# If `selectors` had already been imported before we removed
# select.epoll|kqueue|devpoll, these may have been defined in terms
# of those functions. They'll fail at runtime.
patch_request.remove_item(
selectors,
'EpollSelector',
'KqueueSelector',
'DevpollSelector',
)
selectors.DefaultSelector = DefaultSelector
# Python 3.7 refactors the poll-like selectors to use a common
# base class and capture a reference to select.poll, etc, at
# import time. selectors tends to get imported early
# (importing 'platform' does it: platform -> subprocess -> selectors),
# so we need to clean that up.
if hasattr(selectors, 'PollSelector') and hasattr(selectors.PollSelector, '_selector_cls'):
selectors.PollSelector._selector_cls = Poll
......@@ -174,7 +174,7 @@ class AbstractTestMixin(object):
return
if self.__implements__ is not None and self.stdlib_module is None:
raise AssertionError(
'%s (%r) has __implements__ (%s) but no stdlib counterpart (%s)'
'%s (%r) has __implements__ (%s) but no stdlib counterpart module exists (%s)'
% (self.modname, self.module, self.__implements__, self.stdlib_name))
@skip_if_no_stdlib_counterpart
......
......@@ -4,7 +4,7 @@ try:
# things up properly if the order is wrong.
import selectors
except ImportError:
selectors = None
import selectors2 as selectors
import socket
import gevent
......@@ -14,12 +14,15 @@ import gevent.testing as greentest
patch_all()
@greentest.skipIf(
selectors is None,
"selectors module not present"
)
from gevent.selectors import DefaultSelector
from gevent.selectors import GeventSelector
class TestSelectors(greentest.TestCase):
@greentest.skipOnPy2(
'selectors2 backport does not use _select'
)
@greentest.skipOnWindows(
"SelectSelector._select is a normal function on Windows"
)
......@@ -28,15 +31,17 @@ class TestSelectors(greentest.TestCase):
_select = selectors.SelectSelector._select
self.assertIn('_gevent_monkey', dir(_select))
@greentest.skipUnless(
hasattr(selectors, 'PollSelector'),
"Needs gevent.select.poll"
)
def test_poll_is_default(self):
def test_default(self):
# Depending on the order of imports, gevent.select.poll may be defined but
# selectors.PollSelector may not be defined.
# https://github.com/gevent/gevent/issues/1466
self.assertIs(selectors.DefaultSelector, selectors.PollSelector)
self.assertIs(DefaultSelector, GeventSelector)
self.assertIs(selectors.DefaultSelector, GeventSelector)
def test_import_selectors(self):
# selectors can always be imported. On Python 2,
# this is an alias for gevent.selectors.
__import__('selectors')
def _check_selector(self, sel):
def read(conn, _mask):
......@@ -74,8 +79,8 @@ class TestSelectors(greentest.TestCase):
self.skipTest(name + ' is not defined')
else:
def m(self, k=kind):
sel = k()
self._check_selector(sel)
with k() as sel:
self._check_selector(sel)
m.__name__ = 'test_selector_' + name
return m
......@@ -90,8 +95,13 @@ class TestSelectors(greentest.TestCase):
'DevpollSelector',
'PollSelector',
'SelectSelector',
GeventSelector,
):
SelKind = getattr(selectors, SelKindName, None)
if not isinstance(SelKindName, type):
SelKind = getattr(selectors, SelKindName, None)
else:
SelKind = SelKindName
SelKindName = SelKind.__name__
m = _make_test(SelKindName, SelKind)
locals()[m.__name__] = m
......@@ -100,5 +110,6 @@ class TestSelectors(greentest.TestCase):
del _make_test
if __name__ == '__main__':
greentest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment