Commit 0f086770 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1636 from gevent/issue1532

Add gevent.selectors
parents f0e3bb96 54df0039
......@@ -40,6 +40,11 @@ environment:
# a later point release.
# 64-bit
- PYTHON: "C:\\Python27-x64"
PYTHON_VERSION: "2.7.x" # currently 2.7.13
PYTHON_ARCH: "64"
PYTHON_EXE: python
- PYTHON: "C:\\Python38-x64"
PYTHON_VERSION: "3.8.x"
PYTHON_ARCH: "64"
......@@ -50,11 +55,6 @@ environment:
PYTHON_ARCH: "64"
PYTHON_EXE: python
- PYTHON: "C:\\Python27-x64"
PYTHON_VERSION: "2.7.x" # currently 2.7.13
PYTHON_ARCH: "64"
PYTHON_EXE: python
- PYTHON: "C:\\Python36-x64"
PYTHON_VERSION: "3.6.x" # currently 3.6.0
PYTHON_ARCH: "64"
......
=======================================================
:mod:`gevent.selectors` -- High-level IO Multiplexing
=======================================================
.. automodule:: gevent.selectors
:members:
Add ``gevent.selectors`` containing ``GeventSelector``. This selector
implementation uses gevent details to attempt to reduce overhead when
polling many file descriptors, only some of which become ready at any
given time.
This is monkey-patched as ``selectors.DefaultSelector`` by default.
This is available on Python 2 if the ``selectors2`` backport is
installed. (This backport is installed automatically using the
``recommended`` extra.) When monkey-patching, ``selectors`` is made
available as an alias to this module.
......@@ -146,7 +146,13 @@ monitor
build on all platforms.)
recommended
A shortcut for installing suggested extras together.
A shortcut for installing suggested extras together. This includes
the non-test extras defined here, plus:
- `backports.socketpair
<https://pypi.org/project/backports.socketpair/>`_ on Python
2/Windows (beginning with release 20.6.0);
- `selectors2 <https://pypi.org/project/selectors2/>`_ on Python 2 (beginning with release 20.6.0).
test
Everything needed to run the complete gevent test suite.
......
......@@ -315,6 +315,10 @@ EXTRA_MONITOR = [
EXTRA_RECOMMENDED = [
# We need this at runtime to use the libev-CFFI and libuv backends
CFFI_DEP,
# Backport of selectors module to Python 2
'selectors2 ; python_version == "2.7"',
# Backport of socket.socketpair to Python 2; only needed on Windows
'backports.socketpair ; python_version == "2.7" and sys_platform == "win32"',
] + EXTRA_DNSPYTHON + EXTRA_EVENTS + EXTRA_MONITOR
......
......@@ -194,6 +194,10 @@ class AbstractLinkable(object):
# The object itself becomes false in a boolean way as soon
# as this method returns.
notifier = self._notifier
if notifier is None:
# XXX: How did we get here?
self._check_and_notify()
return
# Early links are allowed to remove later links, and links
# are allowed to add more links, thus we must not
# make a copy of our the ``_links`` list, we must traverse it and
......
......@@ -25,6 +25,7 @@ MAPPING = {
'gevent.local': '_threading_local',
'gevent.socket': 'socket',
'gevent.select': 'select',
'gevent.selectors': 'selectors' if PY3 else 'selectors2',
'gevent.ssl': 'ssl',
'gevent.thread': '_thread' if PY3 else 'thread',
'gevent.subprocess': 'subprocess',
......
......@@ -460,7 +460,8 @@ class socket(_socketcommon.SocketMixin):
SocketType = socket
if hasattr(_socket, 'socketpair'):
# The native, low-level socketpair returns
# low-level objects
def socketpair(family=getattr(_socket, 'AF_UNIX', _socket.AF_INET),
type=_socket.SOCK_STREAM, proto=0):
one, two = _socket.socketpair(family, type, proto)
......@@ -469,6 +470,20 @@ if hasattr(_socket, 'socketpair'):
one._drop()
two._drop()
return result
elif hasattr(__socket__, 'socketpair'):
# The high-level backport uses high-level socket APIs. It works
# cooperatively automatically if we're monkey-patched,
# else we must do it ourself.
_orig_socketpair = __socket__.socketpair
def socketpair(family=_socket.AF_INET, type=_socket.SOCK_STREAM, proto=0):
one, two = _orig_socketpair(family, type, proto)
if not isinstance(one, socket):
one = socket(_sock=one)
two = socket(_sock=two)
if PYPY:
one._drop()
two._drop()
return one, two
elif 'socketpair' in __implements__:
__implements__.remove('socketpair')
......
......@@ -128,6 +128,11 @@ if is_macos:
import _socket
_realsocket = _socket.socket
import socket as __socket__
try:
# Provide implementation of socket.socketpair on Windows < 3.5.
import backports.socketpair
except ImportError:
pass
_name = _value = None
__imports__ = copy_globals(__socket__, globals(),
......
......@@ -415,6 +415,20 @@ def patch_module(target_module, source_module, items=None,
return True
def _check_availability(name):
"""
Test that the source and target modules for *name* are
available and return them.
:raise ImportError: If the source or target cannot be imported.
:return: The tuple ``(gevent_module, target_module, target_module_name)``
"""
gevent_module = getattr(__import__('gevent.' + name), name)
target_module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(target_module_name)
return gevent_module, target_module, target_module_name
def _patch_module(name,
items=None,
_warnings=None,
......@@ -423,9 +437,7 @@ def _patch_module(name,
_notify_did_subscribers=True,
_call_hooks=True):
gevent_module = getattr(__import__('gevent.' + name), name)
module_name = getattr(gevent_module, '__target__', name)
target_module = __import__(module_name)
gevent_module, target_module, target_module_name = _check_availability(name)
patch_module(target_module, gevent_module, items=items,
_warnings=_warnings, _patch_kwargs=_patch_kwargs,
......@@ -451,7 +463,7 @@ def _patch_module(name,
_notify_will_subscribers=False,
_notify_did_subscribers=False,
_call_hooks=False)
saved[alternate_name] = saved[module_name]
saved[alternate_name] = saved[target_module_name]
return gevent_module, target_module
......@@ -1013,20 +1025,48 @@ def patch_select(aggressive=True):
and :func:`select.poll` with :class:`gevent.select.poll` (where available).
If ``aggressive`` is true (the default), also remove other
blocking functions from :mod:`select` and (on Python 3.4 and
above) :mod:`selectors`:
blocking functions from :mod:`select` .
- :func:`select.epoll`
- :func:`select.kqueue`
- :func:`select.kevent`
- :func:`select.devpoll` (Python 3.5+)
"""
_patch_module('select',
_patch_kwargs={'aggressive': aggressive})
@_ignores_DoNotPatch
def patch_selectors(aggressive=True):
"""
Replace :class:`selectors.DefaultSelector` with
:class:`gevent.selectors.GeventSelector`.
If ``aggressive`` is true (the default), also remove other
blocking classes :mod:`selectors`:
- :class:`selectors.EpollSelector`
- :class:`selectors.KqueueSelector`
- :class:`selectors.DevpollSelector` (Python 3.5+)
On Python 2, the :mod:`selectors2` module is used instead
of :mod:`selectors` if it is available. If this module cannot
be imported, no patching is done and :mod:`gevent.selectors` is
not available.
In :func:`patch_all`, the *select* argument controls both this function
and :func:`patch_select`.
.. versionadded:: NEXT
"""
_patch_module('select',
try:
_check_availability('selectors')
except ImportError: # pragma: no cover
return
_patch_module('selectors',
_patch_kwargs={'aggressive': aggressive})
@_ignores_DoNotPatch
def patch_subprocess():
"""
......@@ -1178,6 +1218,7 @@ def patch_all(socket=True, dns=True, time=True, select=True, thread=True, os=Tru
patch_socket(dns=dns, aggressive=aggressive)
if select:
patch_select(aggressive=aggressive)
patch_selectors(aggressive=aggressive)
if ssl:
patch_ssl(_warnings=_warnings, _first_time=first_time)
if subprocess:
......
......@@ -258,6 +258,25 @@ class poll(object):
def modify(self, fd, eventmask):
self.register(fd, eventmask)
def _get_started_watchers(self, watcher_cb):
watchers = []
io = self.loop.io
MAXPRI = self.loop.MAXPRI
try:
for fd, flags in iteritems(self.fds):
watcher = io(fd, flags)
watchers.append(watcher)
watcher.priority = MAXPRI
watcher.start(watcher_cb, fd, pass_events=True)
except:
for awatcher in watchers:
awatcher.stop()
awatcher.close()
raise
return watchers
def poll(self, timeout=None):
"""
poll the registered fds.
......@@ -270,15 +289,8 @@ class poll(object):
i.e., block. This was always the case with libev.
"""
result = PollResult()
watchers = []
io = self.loop.io
MAXPRI = self.loop.MAXPRI
watchers = self._get_started_watchers(result.add_event)
try:
for fd, flags in iteritems(self.fds):
watcher = io(fd, flags)
watchers.append(watcher)
watcher.priority = MAXPRI
watcher.start(result.add_event, fd, pass_events=True)
if timeout is not None:
if timeout < 0:
# The docs for python say that an omitted timeout,
......@@ -320,7 +332,6 @@ class poll(object):
def _gevent_do_monkey_patch(patch_request):
aggressive = patch_request.patch_kwargs['aggressive']
target_mod = patch_request.target_module
patch_request.default_patch_items()
......@@ -334,55 +345,3 @@ def _gevent_do_monkey_patch(patch_request):
'kevent',
'devpoll',
)
if patch_request.PY3:
# TODO: Do we need to broadcast events about patching the selectors
# package? If so, must be careful to deal with DoNotPatch exceptions.
# Python 3 wants to use `select.select` as a member function,
# leading to this error in selectors.py (because
# gevent.select.select is not a builtin and doesn't get the
# magic auto-static that they do):
#
# r, w, _ = self._select(self._readers, self._writers, [], timeout)
# TypeError: select() takes from 3 to 4 positional arguments but 5 were given
#
# Note that this obviously only happens if selectors was
# imported after we had patched select; but there is a code
# path that leads to it being imported first (but now we've
# patched select---so we can't compare them identically). It also doesn't
# happen on Windows, because they define a normal method for _select, to work around
# some weirdness in the handling of the third argument.
orig_select_select = patch_request.get_original('select', 'select')
assert target_mod.select is not orig_select_select
selectors = __import__('selectors')
if selectors.SelectSelector._select in (target_mod.select, orig_select_select):
def _select(self, *args, **kwargs): # pylint:disable=unused-argument
return select(*args, **kwargs)
selectors.SelectSelector._select = _select
_select._gevent_monkey = True # prove for test cases
# Python 3.7 refactors the poll-like selectors to use a common
# base class and capture a reference to select.poll, etc, at
# import time. selectors tends to get imported early
# (importing 'platform' does it: platform -> subprocess -> selectors),
# so we need to clean that up.
if hasattr(selectors, 'PollSelector') and hasattr(selectors.PollSelector, '_selector_cls'):
selectors.PollSelector._selector_cls = poll
if aggressive:
# If `selectors` had already been imported before we removed
# select.epoll|kqueue|devpoll, these may have been defined in terms
# of those functions. They'll fail at runtime.
patch_request.remove_item(
selectors,
'EpollSelector',
'KqueueSelector',
'DevpollSelector',
)
selectors.DefaultSelector = getattr(
selectors,
'PollSelector',
selectors.SelectSelector
)
This diff is collapsed.
......@@ -259,14 +259,6 @@ class Definitions(DefinitionsBase):
when=APPVEYOR & PY3
)
test__socketpair = Ignored(
"""
Py35 added socket.socketpair, all other releases
are missing it. No reason to even test it.
""",
when=WIN & PY2
)
test_ftplib = Flaky(
r"""
could be a problem of appveyor - not sure
......
......@@ -174,7 +174,7 @@ class AbstractTestMixin(object):
return
if self.__implements__ is not None and self.stdlib_module is None:
raise AssertionError(
'%s (%r) has __implements__ (%s) but no stdlib counterpart (%s)'
'%s (%r) has __implements__ (%s) but no stdlib counterpart module exists (%s)'
% (self.modname, self.module, self.__implements__, self.stdlib_name))
@skip_if_no_stdlib_counterpart
......
......@@ -4,22 +4,22 @@ try:
# things up properly if the order is wrong.
import selectors
except ImportError:
selectors = None
import socket
import gevent
import selectors2 as selectors
from gevent.monkey import patch_all
import gevent.testing as greentest
patch_all()
@greentest.skipIf(
selectors is None,
"selectors module not present"
)
class TestSelectors(greentest.TestCase):
from gevent.selectors import DefaultSelector
from gevent.selectors import GeventSelector
from gevent.tests.test__selectors import SelectorTestMixin
class TestSelectors(SelectorTestMixin, greentest.TestCase):
@greentest.skipOnPy2(
'selectors2 backport does not use _select'
)
@greentest.skipOnWindows(
"SelectSelector._select is a normal function on Windows"
)
......@@ -28,45 +28,17 @@ class TestSelectors(greentest.TestCase):
_select = selectors.SelectSelector._select
self.assertIn('_gevent_monkey', dir(_select))
@greentest.skipUnless(
hasattr(selectors, 'PollSelector'),
"Needs gevent.select.poll"
)
def test_poll_is_default(self):
def test_default(self):
# Depending on the order of imports, gevent.select.poll may be defined but
# selectors.PollSelector may not be defined.
# https://github.com/gevent/gevent/issues/1466
self.assertIs(selectors.DefaultSelector, selectors.PollSelector)
def _check_selector(self, sel):
def read(conn, _mask):
data = conn.recv(100) # Should be ready
if data:
conn.send(data) # Hope it won't block
sel.unregister(conn)
conn.close()
def run_selector_once():
events = sel.select()
for key, mask in events:
key.data(key.fileobj, mask)
sock1, sock2 = socket.socketpair()
try:
sel.register(sock1, selectors.EVENT_READ, read)
glet = gevent.spawn(run_selector_once)
DATA = b'abcdef'
sock2.send(DATA)
data = sock2.recv(50)
self.assertEqual(data, DATA)
finally:
sel.close()
sock1.close()
sock2.close()
glet.join(10)
self.assertTrue(glet.ready())
self.assertIs(DefaultSelector, GeventSelector)
self.assertIs(selectors.DefaultSelector, GeventSelector)
def test_import_selectors(self):
# selectors can always be imported once monkey-patched. On Python 2,
# this is an alias for gevent.selectors.
__import__('selectors')
def _make_test(name, kind): # pylint:disable=no-self-argument
if kind is None:
......@@ -74,8 +46,8 @@ class TestSelectors(greentest.TestCase):
self.skipTest(name + ' is not defined')
else:
def m(self, k=kind):
sel = k()
self._check_selector(sel)
with k() as sel:
self._check_selector(sel)
m.__name__ = 'test_selector_' + name
return m
......@@ -90,8 +62,13 @@ class TestSelectors(greentest.TestCase):
'DevpollSelector',
'PollSelector',
'SelectSelector',
GeventSelector,
):
SelKind = getattr(selectors, SelKindName, None)
if not isinstance(SelKindName, type):
SelKind = getattr(selectors, SelKindName, None)
else:
SelKind = SelKindName
SelKindName = SelKind.__name__
m = _make_test(SelKindName, SelKind)
locals()[m.__name__] = m
......@@ -100,5 +77,6 @@ class TestSelectors(greentest.TestCase):
del _make_test
if __name__ == '__main__':
greentest.main()
# Tests for gevent.selectors in its native form, without
# monkey-patching.
import gevent
from gevent import socket
from gevent import selectors
import gevent.testing as greentest
class SelectorTestMixin(object):
@staticmethod
def run_selector_once(sel, timeout=3):
# Run in a background greenlet, leaving the main
# greenlet free to send data.
events = sel.select(timeout=timeout)
for key, mask in events:
key.data(sel, key.fileobj, mask)
gevent.sleep()
unregister_after_send = True
def read_from_ready_socket_and_reply(self, selector, conn, _events):
data = conn.recv(100) # Should be ready
if data:
conn.send(data) # Hope it won't block
# Must unregister before we close.
if self.unregister_after_send:
selector.unregister(conn)
conn.close()
def _check_selector(self, sel):
server, client = socket.socketpair()
try:
sel.register(server, selectors.EVENT_READ, self.read_from_ready_socket_and_reply)
glet = gevent.spawn(self.run_selector_once, sel)
DATA = b'abcdef'
client.send(DATA)
data = client.recv(50) # here is probably where we yield to the event loop
self.assertEqual(data, DATA)
finally:
sel.close()
server.close()
client.close()
glet.join(10)
self.assertTrue(glet.ready())
class GeventSelectorTest(SelectorTestMixin,
greentest.TestCase):
def test_select_using_socketpair(self):
# Basic test.
with selectors.GeventSelector() as sel:
self._check_selector(sel)
def test_select_many_sockets(self):
try:
AF_UNIX = socket.AF_UNIX
except AttributeError:
AF_UNIX = None
pairs = [socket.socketpair() for _ in range(10)]
try:
server_sel = selectors.GeventSelector()
client_sel = selectors.GeventSelector()
for i, pair in enumerate(pairs):
server, client = pair
server_sel.register(server, selectors.EVENT_READ,
self.read_from_ready_socket_and_reply)
client_sel.register(client, selectors.EVENT_READ, i)
# Prime them all to be ready at once.
data = str(i).encode('ascii')
client.send(data)
# Read and reply to all the clients..
# Everyone should be ready, so we ask not to block.
# The call to gevent.idle() is there to make sure that
# all event loop implementations (looking at you, libuv)
# get a chance to poll for IO. Without it, libuv
# doesn't find any results here.
# Not blocking only works for AF_UNIX sockets, though.
# If we got AF_INET (Windows) the data may need some time to
# traverse through the layers.
gevent.idle()
self.run_selector_once(
server_sel,
timeout=-1 if pairs[0][0].family == AF_UNIX else 3)
found = 0
for key, _ in client_sel.select(timeout=3):
expected = str(key.data).encode('ascii')
data = key.fileobj.recv(50)
self.assertEqual(data, expected)
found += 1
self.assertEqual(found, len(pairs))
finally:
server_sel.close()
client_sel.close()
for pair in pairs:
for s in pair:
s.close()
if __name__ == '__main__':
greentest.main()
......@@ -308,6 +308,7 @@ class TestDefaultSpawn(TestCase):
with self.assertRaises(TypeError):
self.ServerClass(self.get_listener(), backlog=25)
@greentest.skipOnLibuvOnCIOnPyPy("Sometimes times out")
def test_backlog_is_accepted_for_address(self):
self.server = self.ServerSubClass((greentest.DEFAULT_BIND_ADDR, 0), backlog=25)
self.assertConnectionRefused()
......
......@@ -15,6 +15,8 @@ class TestSocketpair(unittest.TestCase):
self.assertEqual(msg, read)
y.close()
@unittest.skipUnless(hasattr(socket, 'fromfd'),
'Needs socket.fromfd')
def test_fromfd(self):
msg = b'hello world'
x, y = socket.socketpair()
......
......@@ -438,7 +438,7 @@ class TestMaxsize(TestCase):
pool.spawn(sleep, 0.2)
pool.spawn(sleep, 0.3)
gevent.sleep(0.2)
self.assertEqual(pool.size, 3)
self.assertGreaterEqual(pool.size, 2)
pool.maxsize = 0
gevent.sleep(0.2)
self.assertEqualFlakyRaceCondition(pool.size, 0)
......
......@@ -743,8 +743,9 @@ else:
future = self._threadpool.spawn(fn, *args, **kwargs)
return _FutureProxy(future)
def shutdown(self, wait=True):
super(ThreadPoolExecutor, self).shutdown(wait)
def shutdown(self, wait=True, **kwargs): # pylint:disable=arguments-differ
# In 3.9, this added ``cancel_futures=False``
super(ThreadPoolExecutor, self).shutdown(wait, **kwargs)
# XXX: We don't implement wait properly
kill = getattr(self._threadpool, 'kill', None)
if kill: # pylint:disable=using-constant-test
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment