Commit 58fcd584 authored by Jason Madden's avatar Jason Madden Committed by GitHub

Merge pull request #1316 from gevent/socket-close-fix

Python 2: Correct errors from async closed sockets.
parents 305b60f7 222ffb2c
......@@ -37,9 +37,13 @@ exclude_lines =
if sys.platform == 'win32':
if mswindows:
if is_windows:
if WIN:
self.fail
omit =
# local.so sometimes gets included, and it can't be parsed
# as source, so it fails the whole process.
# coverage 4.5 needs this specified here, 4.4.2 needed it in [run]
*.so
/tmp/test_*
# Third-party vendored code
src/gevent/_tblib.py
......@@ -58,6 +58,11 @@
- Make `gevent.util.assert_switches` produce more informative messages
when the assertion fails.
- Python 2: If a `gevent.socket` was closed asynchronously (in a
different greenlet or a hub callback), `AttributeError` could result
if the socket was already in use. Now the correct socket.error
should be raised.
1.3.7 (2018-10-12)
==================
......
......@@ -159,8 +159,10 @@ del sys
# the following makes hidden imports visible to freezing tools like
# py2exe. see https://github.com/gevent/gevent/issues/181
# This is not well maintained or tested, though, so it likely becomes
# outdated on each major release.
def __dependencies_for_freezing():
def __dependencies_for_freezing(): # pragma: no cover
# pylint:disable=unused-variable
from gevent import core
from gevent import resolver_thread
......
......@@ -181,7 +181,9 @@ class AbstractLinkable(object):
def _wait_return_value(self, waited, wait_success):
# pylint:disable=unused-argument
return None
# Subclasses should override this to return a value from _wait.
# By default we return None.
return None # pragma: no cover all extent subclasses override
def _wait(self, timeout=None):
if self.ready():
......
......@@ -282,6 +282,10 @@ def _primitive_wait(watcher, timeout, timeout_exc, hub):
# Suitable to be bound as an instance method
def wait_on_socket(socket, watcher, timeout_exc=None):
if socket is None or watcher is None:
# test__hub TestCloseSocketWhilePolling, on Python 2; Python 3
# catches the EBADF differently.
raise ConcurrentObjectUseError("The socket has already been closed by another greenlet")
_primitive_wait(watcher, socket.timeout,
timeout_exc if timeout_exc is not None else _NONE,
socket.hub)
......
......@@ -185,10 +185,9 @@ class socket(object):
_wait = _wait_on_socket
def accept(self):
sock = self._sock
while True:
while 1:
try:
client_socket, address = sock.accept()
client_socket, address = self._sock.accept()
break
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
......@@ -212,10 +211,17 @@ class socket(object):
def close(self, _closedsocket=_closedsocket):
# This function should not reference any globals. See Python issue #808164.
# Also break any reference to the loop.io objects. Our fileno, which they were
# tied to, is now free to be reused, so these objects are no longer functional.
# Also break any reference to the loop.io objects. Our fileno,
# which they were tied to, is now free to be reused, so these
# objects are no longer functional.
self._drop_events()
s = self._sock
# Note that we change self._sock at this point. Methods *must not*
# cache `self._sock` separately from self._write_event/self._read_event,
# or they will be out of sync and we may get inappropriate errors.
# (See test__hub:TestCloseSocketWhilePolling for an example).
self._sock = _closedsocket()
if PYPY:
s._drop()
......@@ -227,16 +233,16 @@ class socket(object):
def connect(self, address):
if self.timeout == 0.0:
return self._sock.connect(address)
sock = self._sock
address = _socketcommon._resolve_addr(sock, address)
address = _socketcommon._resolve_addr(self._sock, address)
timer = Timeout._start_new_or_dummy(self.timeout, timeout('timed out'))
try:
while True:
err = sock.getsockopt(SOL_SOCKET, SO_ERROR)
while 1:
err = self._sock.getsockopt(SOL_SOCKET, SO_ERROR)
if err:
raise error(err, strerror(err))
result = sock.connect_ex(address)
result = self._sock.connect_ex(address)
if not result or result == EISCONN:
break
elif (result in (EWOULDBLOCK, EINPROGRESS, EALREADY)) or (result == EINVAL and is_windows):
......@@ -283,10 +289,9 @@ class socket(object):
return fobj
def recv(self, *args):
sock = self._sock # keeping the reference so that fd is not closed during waiting
while True:
while 1:
try:
return sock.recv(*args)
return self._sock.recv(*args)
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
raise
......@@ -295,10 +300,9 @@ class socket(object):
self._wait(self._read_event)
def recvfrom(self, *args):
sock = self._sock
while True:
while 1:
try:
return sock.recvfrom(*args)
return self._sock.recvfrom(*args)
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
raise
......@@ -306,10 +310,9 @@ class socket(object):
self._wait(self._read_event)
def recvfrom_into(self, *args):
sock = self._sock
while True:
while 1:
try:
return sock.recvfrom_into(*args)
return self._sock.recvfrom_into(*args)
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
raise
......@@ -317,10 +320,9 @@ class socket(object):
self._wait(self._read_event)
def recv_into(self, *args):
sock = self._sock
while True:
while 1:
try:
return sock.recv_into(*args)
return self._sock.recv_into(*args)
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
raise
......@@ -328,18 +330,17 @@ class socket(object):
self._wait(self._read_event)
def send(self, data, flags=0, timeout=timeout_default):
sock = self._sock
if timeout is timeout_default:
timeout = self.timeout
try:
return sock.send(data, flags)
return self._sock.send(data, flags)
except error as ex:
if ex.args[0] not in _socketcommon.GSENDAGAIN or timeout == 0.0:
raise
sys.exc_clear()
self._wait(self._write_event)
try:
return sock.send(data, flags)
return self._sock.send(data, flags)
except error as ex2:
if ex2.args[0] == EWOULDBLOCK:
return 0
......@@ -354,16 +355,15 @@ class socket(object):
return _socketcommon._sendall(self, data_memory, flags)
def sendto(self, *args):
sock = self._sock
try:
return sock.sendto(*args)
return self._sock.sendto(*args)
except error as ex:
if ex.args[0] != EWOULDBLOCK or self.timeout == 0.0:
raise
sys.exc_clear()
self._wait(self._write_event)
try:
return sock.sendto(*args)
return self._sock.sendto(*args)
except error as ex2:
if ex2.args[0] == EWOULDBLOCK:
return 0
......
......@@ -68,6 +68,7 @@ from .skipping import skipOnPyPy
from .skipping import skipOnPyPyOnCI
from .skipping import skipOnPyPy3
from .skipping import skipIf
from .skipping import skipUnless
from .skipping import skipOnLibev
from .skipping import skipOnLibuv
from .skipping import skipOnLibuvOnWin
......@@ -122,6 +123,12 @@ from .flaky import reraiseFlakyTestRaceCondition
from .flaky import reraises_flaky_timeout
from .flaky import reraises_flaky_race_condition
def gc_collect_if_needed():
"Collect garbage if necessary for destructors to run"
import gc
if PYPY: # pragma: no cover
gc.collect()
try:
from unittest import mock
except ImportError: # Python 2
......
......@@ -466,6 +466,8 @@ if LIBUV:
'test_socket.BufferIOTest.testRecvFromIntoBytearray',
'test_socket.BufferIOTest.testRecvFromIntoArray',
'test_socket.BufferIOTest.testRecvFromIntoEmptyBuffer',
'test_socket.BufferIOTest.testRecvFromIntoMemoryview',
'test_socket.BufferIOTest.testRecvFromIntoSmallBuffer',
]
if PY3:
......
......@@ -92,6 +92,7 @@ if sysinfo.PYPY:
skipUnderCoverage = unittest.skip if sysinfo.RUN_COVERAGE else _do_not_skip
skipIf = unittest.skipIf
skipUnless = unittest.skipUnless
......
......@@ -106,7 +106,7 @@ class TestPeriodicMonitoringThread(_AbstractTestPeriodicMonitoringThread,
self.assertRaises(ValueError, self.pmt.add_monitoring_function, lambda: None, -1)
def f():
pass
"Does nothing"
# Add
self.pmt.add_monitoring_function(f, 1)
......
......@@ -8,7 +8,7 @@ if sys.argv[1:] == []:
os.environ['GEVENT_BACKEND'] = 'select'
popen = subprocess.Popen([sys.executable, __file__, '1'])
assert popen.wait() == 0, popen.poll()
else:
else: # pragma: no cover
hub = gevent.get_hub()
if 'select' in gevent.core.supported_backends():
assert hub.loop.backend == 'select', hub.loop.backend
......
......@@ -35,8 +35,8 @@ class Test(greentest.TestCase):
g = gevent.spawn(hello, expected_error)
g.join()
self.assert_error(ExpectedError, expected_error)
if not isinstance(g.exception, ExpectedError):
raise g.exception
self.assertIsInstance(g.exception, ExpectedError)
try:
raise
except: # pylint:disable=bare-except
......
......@@ -165,9 +165,6 @@ def return25():
return 25
def sleep0():
return sleep(0)
class TestReturn_link(LinksTestCase):
link_method = 'link'
......@@ -748,7 +745,7 @@ class TestBasic(greentest.TestCase):
raise ValueError("call stack is not deep enough")
try:
ogf = greenlet.sys_getframe
except AttributeError:
except AttributeError: # pragma: no cover
# Must be running cython compiled
raise unittest.SkipTest("Cannot mock when Cython compiled")
greenlet.sys_getframe = get
......@@ -806,7 +803,7 @@ class TestRef(greentest.TestCase):
@greentest.skipOnPurePython("Needs C extension")
class TestCExt(greentest.TestCase):
class TestCExt(greentest.TestCase): # pragma: no cover (we only do coverage on pure-Python)
def test_c_extension(self):
self.assertEqual(greenlet.Greenlet.__module__,
......
......@@ -149,13 +149,11 @@ class Test(greentest.TestCase):
s = set()
s.add(p1)
s.add(p2)
try:
with self.assertRaises(Timeout):
gevent.killall(s, timeout=0.5)
except Timeout:
for g in s:
assert not g.dead
else:
self.fail("Should raise timeout")
for g in s:
self.assertFalse(g.dead, g)
class GreenletSubclass(gevent.Greenlet):
......
......@@ -36,10 +36,11 @@ DELAY = 0.1
class TestCloseSocketWhilePolling(greentest.TestCase):
def test(self):
with self.assertRaises(Exception):
sock = socket.socket()
self._close_on_teardown(sock)
t = get_hub().loop.timer(0, sock.close)
sock = socket.socket()
self._close_on_teardown(sock)
t = get_hub().loop.timer(0)
t.start(sock.close)
with self.assertRaises(socket.error):
try:
sock.connect(('python.org', 81))
finally:
......
......@@ -14,7 +14,7 @@ class TestSwitch(greentest.TestCase):
self.switched_to = [False, False]
self.caught = None
def runner(self, i):
def should_never_run(self, i): # pragma: no cover
self.switched_to[i] = True
def check(self, g, g2):
......@@ -33,8 +33,8 @@ class TestSwitch(greentest.TestCase):
def test_gevent_kill(self):
g = gevent.spawn(self.runner, 0) # create but do not switch to
g2 = gevent.spawn(self.runner, 1) # create but do not switch to
g = gevent.spawn(self.should_never_run, 0) # create but do not switch to
g2 = gevent.spawn(self.should_never_run, 1) # create but do not switch to
# Using gevent.kill
gevent.kill(g)
gevent.kill(g2)
......@@ -42,16 +42,16 @@ class TestSwitch(greentest.TestCase):
def test_greenlet_kill(self):
# killing directly
g = gevent.spawn(self.runner, 0)
g2 = gevent.spawn(self.runner, 1)
g = gevent.spawn(self.should_never_run, 0)
g2 = gevent.spawn(self.should_never_run, 1)
g.kill()
g2.kill()
self.check(g, g2)
def test_throw(self):
# throwing
g = gevent.spawn(self.runner, 0)
g2 = gevent.spawn(self.runner, 1)
g = gevent.spawn(self.should_never_run, 0)
g2 = gevent.spawn(self.should_never_run, 1)
g.throw(gevent.GreenletExit)
g2.throw(gevent.GreenletExit)
self.check(g, g2)
......
......@@ -16,7 +16,7 @@ if not sys.argv[1:]:
# or __package__, falling back on __name__ and __path__\n return f(*args, **kwds)\n'
assert err == b'' or b'sys.excepthook' in err or b'Warning' in err, (out, err, code)
elif sys.argv[1:] == ['subprocess']:
elif sys.argv[1:] == ['subprocess']: # pragma: no cover
import gevent
import gevent.monkey
gevent.monkey.patch_all(sys=True)
......@@ -30,5 +30,5 @@ elif sys.argv[1:] == ['subprocess']:
gevent.spawn(printline).join()
else:
else: # pragma: no cover
sys.exit('Invalid arguments: %r' % (sys.argv, ))
......@@ -8,7 +8,7 @@ handling of KeyboardInterrupt.
import sys
if sys.argv[1:] == ['subprocess']:
if sys.argv[1:] == ['subprocess']: # pragma: no cover
import gevent
def task():
......
......@@ -277,9 +277,8 @@ class TestGeventLocal(greentest.TestCase):
my_local = MyLocal()
my_local.sentinel = None
if greentest.PYPY:
import gc
gc.collect()
greentest.gc_collect_if_needed()
del created_sentinels[:]
del deleted_sentinels[:]
......@@ -298,8 +297,7 @@ class TestGeventLocal(greentest.TestCase):
for g in greenlets:
assert not g.is_alive()
gevent.sleep() # let the callbacks run
if greentest.PYPY:
gc.collect()
greentest.gc_collect_if_needed()
# The sentinels should be gone too
self.assertEqual(len(deleted_sentinels), len(greenlets))
......@@ -412,7 +410,7 @@ class TestLocalInterface(greentest.TestCase):
@greentest.skipOnPurePython("Needs C extension")
class TestCExt(greentest.TestCase):
class TestCExt(greentest.TestCase): # pragma: no cover
def test_c_extension(self):
self.assertEqual(local.__module__,
......
......@@ -9,7 +9,7 @@ from gevent.timeout import Timeout
hasattr(sys, 'gettotalrefcount'),
"Needs debug build"
)
class TestQueue(TestCase):
class TestQueue(TestCase): # pragma: no cover
# pylint:disable=bare-except,no-member
def test(self):
......
......@@ -6,17 +6,17 @@ from os import pipe
import gevent
from gevent import os
from gevent.testing import TestCase, main, LARGE_TIMEOUT
from gevent import Greenlet, joinall
from gevent import testing as greentest
from gevent.testing import mock
from gevent.testing import six
from gevent.testing.skipping import skipOnLibuvOnPyPyOnWin
class TestOS_tp(TestCase):
class TestOS_tp(greentest.TestCase):
__timeout__ = LARGE_TIMEOUT
__timeout__ = greentest.LARGE_TIMEOUT
def pipe(self):
return pipe()
......@@ -53,8 +53,8 @@ class TestOS_tp(TestCase):
# the pipe before the consumer starts, and would block the entire
# process. Therefore the next line would never finish.
joinall([producer, consumer])
assert bytesread[0] == nbytes
assert bytesread[0] == byteswritten[0]
self.assertEqual(bytesread[0], nbytes)
self.assertEqual(bytesread[0], byteswritten[0])
if sys.version_info[0] < 3:
......@@ -67,108 +67,112 @@ class TestOS_tp(TestCase):
self._test_if_pipe_blocks(six.builtins.memoryview)
if hasattr(os, 'make_nonblocking'):
@greentest.skipUnless(hasattr(os, 'make_nonblocking'),
"Only on POSIX")
class TestOS_nb(TestOS_tp):
class TestOS_nb(TestOS_tp):
def read(self, fd, count):
return os.nb_read(fd, count)
read = staticmethod(os.nb_read)
write = staticmethod(os.nb_write)
def write(self, fd, count):
return os.nb_write(fd, count)
def pipe(self):
r, w = super(TestOS_nb, self).pipe()
os.make_nonblocking(r)
os.make_nonblocking(w)
return r, w
def _make_ignored_oserror(self):
import errno
ignored_oserror = OSError()
ignored_oserror.errno = errno.EINTR
return ignored_oserror
def _check_hub_event_closed(self, mock_get_hub, fd, event):
mock_get_hub.assert_called_once_with()
hub = mock_get_hub.return_value
io = hub.loop.io
io.assert_called_once_with(fd, event)
event = io.return_value
event.close.assert_called_once_with()
def _test_event_closed_on_normal_io(self, nb_func, nb_arg,
mock_io, mock_get_hub, event):
mock_io.side_effect = [self._make_ignored_oserror(), 42]
fd = 100
result = nb_func(fd, nb_arg)
self.assertEqual(result, 42)
self._check_hub_event_closed(mock_get_hub, fd, event)
def _test_event_closed_on_io_error(self, nb_func, nb_arg,
mock_io, mock_get_hub, event):
mock_io.side_effect = [self._make_ignored_oserror(), ValueError()]
fd = 100
with self.assertRaises(ValueError):
nb_func(fd, nb_arg)
self._check_hub_event_closed(mock_get_hub, fd, event)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._write')
def test_event_closed_on_write(self, mock_write, mock_get_hub):
self._test_event_closed_on_normal_io(os.nb_write, b'buf',
mock_write, mock_get_hub,
2)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._write')
def test_event_closed_on_write_error(self, mock_write, mock_get_hub):
self._test_event_closed_on_io_error(os.nb_write, b'buf',
mock_write, mock_get_hub,
2)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._read')
def test_event_closed_on_read(self, mock_read, mock_get_hub):
self._test_event_closed_on_normal_io(os.nb_read, b'buf',
mock_read, mock_get_hub,
1)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._read')
def test_event_closed_on_read_error(self, mock_read, mock_get_hub):
self._test_event_closed_on_io_error(os.nb_read, b'buf',
mock_read, mock_get_hub,
1)
if hasattr(os, 'fork_and_watch'):
class TestForkAndWatch(TestCase):
__timeout__ = LARGE_TIMEOUT
def test_waitpid_all(self):
# Cover this specific case.
pid = os.fork_and_watch()
if pid:
os.waitpid(-1, 0)
# Can't assert on what the pid actually was,
# our testrunner may have spawned multiple children.
os._reap_children(0) # make the leakchecker happy
else:
gevent.sleep(2)
os._exit(0)
def test_waitpid_wrong_neg(self):
self.assertRaises(OSError, os.waitpid, -2, 0)
def pipe(self):
r, w = super(TestOS_nb, self).pipe()
os.make_nonblocking(r)
os.make_nonblocking(w)
return r, w
def _make_ignored_oserror(self):
import errno
ignored_oserror = OSError()
ignored_oserror.errno = errno.EINTR
return ignored_oserror
def _check_hub_event_closed(self, mock_get_hub, fd, event):
mock_get_hub.assert_called_once_with()
hub = mock_get_hub.return_value
io = hub.loop.io
io.assert_called_once_with(fd, event)
event = io.return_value
event.close.assert_called_once_with()
def _test_event_closed_on_normal_io(self, nb_func, nb_arg,
mock_io, mock_get_hub, event):
mock_io.side_effect = [self._make_ignored_oserror(), 42]
fd = 100
result = nb_func(fd, nb_arg)
self.assertEqual(result, 42)
self._check_hub_event_closed(mock_get_hub, fd, event)
def _test_event_closed_on_io_error(self, nb_func, nb_arg,
mock_io, mock_get_hub, event):
mock_io.side_effect = [self._make_ignored_oserror(), ValueError()]
fd = 100
with self.assertRaises(ValueError):
nb_func(fd, nb_arg)
self._check_hub_event_closed(mock_get_hub, fd, event)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._write')
def test_event_closed_on_write(self, mock_write, mock_get_hub):
self._test_event_closed_on_normal_io(os.nb_write, b'buf',
mock_write, mock_get_hub,
2)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._write')
def test_event_closed_on_write_error(self, mock_write, mock_get_hub):
self._test_event_closed_on_io_error(os.nb_write, b'buf',
mock_write, mock_get_hub,
2)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._read')
def test_event_closed_on_read(self, mock_read, mock_get_hub):
self._test_event_closed_on_normal_io(os.nb_read, b'buf',
mock_read, mock_get_hub,
1)
@mock.patch('gevent.os.get_hub')
@mock.patch('gevent.os._read')
def test_event_closed_on_read_error(self, mock_read, mock_get_hub):
self._test_event_closed_on_io_error(os.nb_read, b'buf',
mock_read, mock_get_hub,
1)
@greentest.skipUnless(hasattr(os, 'fork_and_watch'),
"Only on POSIX")
class TestForkAndWatch(greentest.TestCase):
__timeout__ = greentest.LARGE_TIMEOUT
def test_waitpid_all(self):
# Cover this specific case.
pid = os.fork_and_watch()
if pid:
os.waitpid(-1, 0)
# Can't assert on what the pid actually was,
# our testrunner may have spawned multiple children.
os._reap_children(0) # make the leakchecker happy
else: # pragma: no cover
gevent.sleep(2)
os._exit(0)
def test_waitpid_wrong_neg(self):
self.assertRaises(OSError, os.waitpid, -2, 0)
def test_waitpid_wrong_pos(self):
self.assertRaises(OSError, os.waitpid, 1, 0)
def test_waitpid_wrong_pos(self):
self.assertRaises(OSError, os.waitpid, 1, 0)
if __name__ == '__main__':
main()
greentest.main()
......@@ -140,7 +140,7 @@ class TestQueue(TestCase):
with gevent.Timeout(0, RuntimeError()):
try:
result = q.get()
evt.set(result)
evt.set(result) # pragma: no cover (should have raised)
except RuntimeError:
evt.set('timed out')
......@@ -169,7 +169,7 @@ class TestQueue(TestCase):
with gevent.Timeout(0, RuntimeError()):
try:
result = q.get()
evt.set(result)
evt.set(result) # pragma: no cover (should have raised)
except RuntimeError:
evt.set('timed out')
......@@ -188,7 +188,7 @@ class TestQueue(TestCase):
with gevent.Timeout(0, RuntimeError()):
try:
result = q.get()
evt.set(result)
evt.set(result) # pragma: no cover (should have raised)
except RuntimeError:
evt.set('timed out')
......
......@@ -14,7 +14,7 @@ try:
s.close()
del s
assert r() is None
except AssertionError:
except AssertionError: # pragma: no cover
import sys
if hasattr(sys, 'pypy_version_info'):
# PyPy uses a non refcounted GC which may defer
......
......@@ -8,7 +8,7 @@ import gevent.testing as greentest
def readall(sock, _):
while sock.recv(1024):
pass
pass # pragma: no cover we never actually send the data
sock.close()
......
import sys
if 'runtestcase' in sys.argv[1:]:
if 'runtestcase' in sys.argv[1:]: # pragma: no cover
import gevent
import gevent.subprocess
gevent.spawn(sys.exit, 'bye')
......@@ -15,6 +15,6 @@ else:
__file__, 'runtestcase'],
stderr=subprocess.PIPE).communicate()
if b'refs' in err: # Something to do with debug mode python builds?
assert err.startswith(b'bye'), repr(err)
assert err.startswith(b'bye'), repr(err) # pragma: no cover
else:
assert err.strip() == b'bye', repr(err)
......@@ -45,7 +45,7 @@ class _FakeTimer(object):
@property
def seconds(self):
return None
"Always returns None"
timer = exception = seconds
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment