Commit 0b2a6f95 authored by Jason Madden's avatar Jason Madden

Update 3.7 tests to latest version.

parent 8bc49537
...@@ -257,6 +257,7 @@ class DummyFTPServer(asyncore.dispatcher, threading.Thread): ...@@ -257,6 +257,7 @@ class DummyFTPServer(asyncore.dispatcher, threading.Thread):
def __init__(self, address, af=socket.AF_INET): def __init__(self, address, af=socket.AF_INET):
threading.Thread.__init__(self) threading.Thread.__init__(self)
asyncore.dispatcher.__init__(self) asyncore.dispatcher.__init__(self)
self.daemon = True
self.create_socket(af, socket.SOCK_STREAM) self.create_socket(af, socket.SOCK_STREAM)
self.bind(address) self.bind(address)
self.listen(5) self.listen(5)
...@@ -312,8 +313,6 @@ if ssl is not None: ...@@ -312,8 +313,6 @@ if ssl is not None:
def secure_connection(self): def secure_connection(self):
context = ssl.SSLContext() context = ssl.SSLContext()
# TODO: fix TLSv1.3 support
context.options |= ssl.OP_NO_TLSv1_3
context.load_cert_chain(CERTFILE) context.load_cert_chain(CERTFILE)
socket = context.wrap_socket(self.socket, socket = context.wrap_socket(self.socket,
suppress_ragged_eofs=False, suppress_ragged_eofs=False,
...@@ -405,7 +404,7 @@ if ssl is not None: ...@@ -405,7 +404,7 @@ if ssl is not None:
def close(self): def close(self):
if (isinstance(self.socket, ssl.SSLSocket) and if (isinstance(self.socket, ssl.SSLSocket) and
self.socket._sslobj is not None): self.socket._sslobj is not None):
self._do_ssl_shutdown() self._do_ssl_shutdown()
else: else:
super(SSLConnection, self).close() super(SSLConnection, self).close()
...@@ -910,8 +909,6 @@ class TestTLS_FTPClass(TestCase): ...@@ -910,8 +909,6 @@ class TestTLS_FTPClass(TestCase):
def test_context(self): def test_context(self):
self.client.quit() self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# TODO: fix TLSv1.3 support
ctx.options |= ssl.OP_NO_TLSv1_3
ctx.check_hostname = False ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE ctx.verify_mode = ssl.CERT_NONE
self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE, self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
...@@ -944,8 +941,6 @@ class TestTLS_FTPClass(TestCase): ...@@ -944,8 +941,6 @@ class TestTLS_FTPClass(TestCase):
def test_check_hostname(self): def test_check_hostname(self):
self.client.quit() self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
# TODO: fix TLSv1.3 support
ctx.options |= ssl.OP_NO_TLSv1_3
self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED) self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
self.assertEqual(ctx.check_hostname, True) self.assertEqual(ctx.check_hostname, True)
ctx.load_verify_locations(CAFILE) ctx.load_verify_locations(CAFILE)
...@@ -982,6 +977,7 @@ class TestTimeouts(TestCase): ...@@ -982,6 +977,7 @@ class TestTimeouts(TestCase):
self.sock.settimeout(20) self.sock.settimeout(20)
self.port = support.bind_port(self.sock) self.port = support.bind_port(self.sock)
self.server_thread = threading.Thread(target=self.server) self.server_thread = threading.Thread(target=self.server)
self.server_thread.daemon = True
self.server_thread.start() self.server_thread.start()
# Wait for the server to be ready. # Wait for the server to be ready.
self.evt.wait() self.evt.wait()
......
...@@ -481,7 +481,14 @@ class ScalableSelectorMixIn: ...@@ -481,7 +481,14 @@ class ScalableSelectorMixIn:
self.skipTest("FD limit reached") self.skipTest("FD limit reached")
raise raise
self.assertEqual(NUM_FDS // 2, len(s.select())) try:
fds = s.select()
except OSError as e:
if e.errno == errno.EINVAL and sys.platform == 'darwin':
# unexplainable errors on macOS don't need to fail the test
self.skipTest("Invalid argument error calling poll()")
raise
self.assertEqual(NUM_FDS // 2, len(fds))
class DefaultSelectorTestCase(BaseSelectorTestCase): class DefaultSelectorTestCase(BaseSelectorTestCase):
......
...@@ -893,7 +893,7 @@ class GeneralModuleTests(unittest.TestCase): ...@@ -893,7 +893,7 @@ class GeneralModuleTests(unittest.TestCase):
) )
for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2', for addr in ['0.1.1.~1', '1+.1.1.1', '::1q', '::1::2',
'1:1:1:1:1:1:1:1:1']: '1:1:1:1:1:1:1:1:1']:
with self.assertRaises(OSError): with self.assertRaises(OSError, msg=addr):
socket.gethostbyname(addr) socket.gethostbyname(addr)
with self.assertRaises(OSError, msg=explanation): with self.assertRaises(OSError, msg=explanation):
socket.gethostbyaddr(addr) socket.gethostbyaddr(addr)
......
...@@ -1846,6 +1846,7 @@ class SimpleBackgroundTests(unittest.TestCase): ...@@ -1846,6 +1846,7 @@ class SimpleBackgroundTests(unittest.TestCase):
s.connect(self.server_addr) s.connect(self.server_addr)
cert = s.getpeercert() cert = s.getpeercert()
self.assertTrue(cert) self.assertTrue(cert)
# Same with a bytes `capath` argument # Same with a bytes `capath` argument
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
ctx.verify_mode = ssl.CERT_REQUIRED ctx.verify_mode = ssl.CERT_REQUIRED
...@@ -1861,8 +1862,6 @@ class SimpleBackgroundTests(unittest.TestCase): ...@@ -1861,8 +1862,6 @@ class SimpleBackgroundTests(unittest.TestCase):
der = ssl.PEM_cert_to_DER_cert(pem) der = ssl.PEM_cert_to_DER_cert(pem)
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
ctx.verify_mode = ssl.CERT_REQUIRED ctx.verify_mode = ssl.CERT_REQUIRED
# TODO: fix TLSv1.3 support
ctx.options |= ssl.OP_NO_TLSv1_3
ctx.load_verify_locations(cadata=pem) ctx.load_verify_locations(cadata=pem)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr) s.connect(self.server_addr)
...@@ -1872,8 +1871,6 @@ class SimpleBackgroundTests(unittest.TestCase): ...@@ -1872,8 +1871,6 @@ class SimpleBackgroundTests(unittest.TestCase):
# same with DER # same with DER
ctx = ssl.SSLContext(ssl.PROTOCOL_TLS) ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
ctx.verify_mode = ssl.CERT_REQUIRED ctx.verify_mode = ssl.CERT_REQUIRED
# TODO: fix TLSv1.3 support
ctx.options |= ssl.OP_NO_TLSv1_3
ctx.load_verify_locations(cadata=der) ctx.load_verify_locations(cadata=der)
with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s: with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
s.connect(self.server_addr) s.connect(self.server_addr)
...@@ -2129,11 +2126,21 @@ class ThreadedEchoServer(threading.Thread): ...@@ -2129,11 +2126,21 @@ class ThreadedEchoServer(threading.Thread):
self.sock, server_side=True) self.sock, server_side=True)
self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol()) self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol()) self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
except (ssl.SSLError, ConnectionResetError, OSError) as e: except (ConnectionResetError, BrokenPipeError) as e:
# We treat ConnectionResetError as though it were an # We treat ConnectionResetError as though it were an
# SSLError - OpenSSL on Ubuntu abruptly closes the # SSLError - OpenSSL on Ubuntu abruptly closes the
# connection when asked to use an unsupported protocol. # connection when asked to use an unsupported protocol.
# #
# BrokenPipeError is raised in TLS 1.3 mode, when OpenSSL
# tries to send session tickets after handshake.
# https://github.com/openssl/openssl/issues/6342
self.server.conn_errors.append(str(e))
if self.server.chatty:
handle_error("\n server: bad connection attempt from " + repr(self.addr) + ":\n")
self.running = False
self.close()
return False
except (ssl.SSLError, OSError) as e:
# OSError may occur with wrong protocols, e.g. both # OSError may occur with wrong protocols, e.g. both
# sides use PROTOCOL_TLS_SERVER. # sides use PROTOCOL_TLS_SERVER.
# #
...@@ -2240,11 +2247,22 @@ class ThreadedEchoServer(threading.Thread): ...@@ -2240,11 +2247,22 @@ class ThreadedEchoServer(threading.Thread):
sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n" sys.stdout.write(" server: read %r (%s), sending back %r (%s)...\n"
% (msg, ctype, msg.lower(), ctype)) % (msg, ctype, msg.lower(), ctype))
self.write(msg.lower()) self.write(msg.lower())
except ConnectionResetError:
# XXX: OpenSSL 1.1.1 sometimes raises ConnectionResetError
# when connection is not shut down gracefully.
if self.server.chatty and support.verbose:
sys.stdout.write(
" Connection reset by peer: {}\n".format(
self.addr)
)
self.close()
self.running = False
except OSError: except OSError:
if self.server.chatty: if self.server.chatty:
handle_error("Test server failure:\n") handle_error("Test server failure:\n")
self.close() self.close()
self.running = False self.running = False
# normally, we'd just stop here, but for the test # normally, we'd just stop here, but for the test
# harness, we want to stop the server # harness, we want to stop the server
self.server.stop() self.server.stop()
...@@ -2319,6 +2337,11 @@ class ThreadedEchoServer(threading.Thread): ...@@ -2319,6 +2337,11 @@ class ThreadedEchoServer(threading.Thread):
pass pass
except KeyboardInterrupt: except KeyboardInterrupt:
self.stop() self.stop()
except BaseException as e:
if support.verbose and self.chatty:
sys.stdout.write(
' connection handling failed: ' + repr(e) + '\n')
self.sock.close() self.sock.close()
def stop(self): def stop(self):
...@@ -2716,10 +2739,7 @@ class ThreadedTests(unittest.TestCase): ...@@ -2716,10 +2739,7 @@ class ThreadedTests(unittest.TestCase):
def test_ecc_cert(self): def test_ecc_cert(self):
client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
client_context.load_verify_locations(SIGNING_CA) client_context.load_verify_locations(SIGNING_CA)
client_context.set_ciphers( client_context.set_ciphers('ECDHE:ECDSA:!NULL:!aRSA')
'TLS13-AES-128-GCM-SHA256:TLS13-CHACHA20-POLY1305-SHA256:'
'ECDHE:ECDSA:!NULL:!aRSA'
)
hostname = SIGNED_CERTFILE_ECC_HOSTNAME hostname = SIGNED_CERTFILE_ECC_HOSTNAME
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
...@@ -2769,8 +2789,6 @@ class ThreadedTests(unittest.TestCase): ...@@ -2769,8 +2789,6 @@ class ThreadedTests(unittest.TestCase):
server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER) server_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
server_context.load_cert_chain(IDNSANSFILE) server_context.load_cert_chain(IDNSANSFILE)
# TODO: fix TLSv1.3 support
server_context.options |= ssl.OP_NO_TLSv1_3
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED context.verify_mode = ssl.CERT_REQUIRED
...@@ -2821,7 +2839,7 @@ class ThreadedTests(unittest.TestCase): ...@@ -2821,7 +2839,7 @@ class ThreadedTests(unittest.TestCase):
with self.assertRaises(ssl.CertificateError): with self.assertRaises(ssl.CertificateError):
s.connect((HOST, server.port)) s.connect((HOST, server.port))
def test_wrong_cert(self): def test_wrong_cert_tls12(self):
"""Connecting when the server rejects the client's certificate """Connecting when the server rejects the client's certificate
Launch a server with CERT_REQUIRED, and check that trying to Launch a server with CERT_REQUIRED, and check that trying to
...@@ -2832,9 +2850,8 @@ class ThreadedTests(unittest.TestCase): ...@@ -2832,9 +2850,8 @@ class ThreadedTests(unittest.TestCase):
client_context.load_cert_chain(WRONG_CERT) client_context.load_cert_chain(WRONG_CERT)
# require TLS client authentication # require TLS client authentication
server_context.verify_mode = ssl.CERT_REQUIRED server_context.verify_mode = ssl.CERT_REQUIRED
# TODO: fix TLSv1.3 support # TLS 1.3 has different handshake
# With TLS 1.3, test fails with exception in server thread client_context.maximum_version = ssl.TLSVersion.TLSv1_2
server_context.options |= ssl.OP_NO_TLSv1_3
server = ThreadedEchoServer( server = ThreadedEchoServer(
context=server_context, chatty=True, connectionchatty=True, context=server_context, chatty=True, connectionchatty=True,
...@@ -2859,6 +2876,36 @@ class ThreadedTests(unittest.TestCase): ...@@ -2859,6 +2876,36 @@ class ThreadedTests(unittest.TestCase):
else: else:
self.fail("Use of invalid cert should have failed!") self.fail("Use of invalid cert should have failed!")
@unittest.skipUnless(ssl.HAS_TLSv1_3, "Test needs TLS 1.3")
def test_wrong_cert_tls13(self):
client_context, server_context, hostname = testing_context()
client_context.load_cert_chain(WRONG_CERT)
server_context.verify_mode = ssl.CERT_REQUIRED
server_context.minimum_version = ssl.TLSVersion.TLSv1_3
client_context.minimum_version = ssl.TLSVersion.TLSv1_3
server = ThreadedEchoServer(
context=server_context, chatty=True, connectionchatty=True,
)
with server, \
client_context.wrap_socket(socket.socket(),
server_hostname=hostname) as s:
# TLS 1.3 perform client cert exchange after handshake
s.connect((HOST, server.port))
try:
s.write(b'data')
s.read(4)
except ssl.SSLError as e:
if support.verbose:
sys.stdout.write("\nSSLError is %r\n" % e)
except OSError as e:
if e.errno != errno.ECONNRESET:
raise
if support.verbose:
sys.stdout.write("\nsocket.error is %r\n" % e)
else:
self.fail("Use of invalid cert should have failed!")
def test_rude_shutdown(self): def test_rude_shutdown(self):
"""A brutal shutdown of an SSL server should raise an OSError """A brutal shutdown of an SSL server should raise an OSError
in the client when attempting handshake. in the client when attempting handshake.
...@@ -3435,7 +3482,7 @@ class ThreadedTests(unittest.TestCase): ...@@ -3435,7 +3482,7 @@ class ThreadedTests(unittest.TestCase):
# Block on the accept and wait on the connection to close. # Block on the accept and wait on the connection to close.
evt.set() evt.set()
remote, peer = server.accept() remote, peer = server.accept()
remote.recv(1) remote.send(remote.recv(4))
t = threading.Thread(target=serve) t = threading.Thread(target=serve)
t.start() t.start()
...@@ -3443,6 +3490,8 @@ class ThreadedTests(unittest.TestCase): ...@@ -3443,6 +3490,8 @@ class ThreadedTests(unittest.TestCase):
evt.wait() evt.wait()
client = context.wrap_socket(socket.socket()) client = context.wrap_socket(socket.socket())
client.connect((host, port)) client.connect((host, port))
client.send(b'data')
client.recv()
client_addr = client.getsockname() client_addr = client.getsockname()
client.close() client.close()
t.join() t.join()
...@@ -3466,17 +3515,16 @@ class ThreadedTests(unittest.TestCase): ...@@ -3466,17 +3515,16 @@ class ThreadedTests(unittest.TestCase):
sock.do_handshake() sock.do_handshake()
self.assertEqual(cm.exception.errno, errno.ENOTCONN) self.assertEqual(cm.exception.errno, errno.ENOTCONN)
def test_default_ciphers(self): def test_no_shared_ciphers(self):
context = ssl.SSLContext(ssl.PROTOCOL_TLS) client_context, server_context, hostname = testing_context()
try: # OpenSSL enables all TLS 1.3 ciphers, enforce TLS 1.2 for test
# Force a set of weak ciphers on our client context client_context.options |= ssl.OP_NO_TLSv1_3
context.set_ciphers("DES") # Force different suites on client and master
except ssl.SSLError: client_context.set_ciphers("AES128")
self.skipTest("no DES cipher available") server_context.set_ciphers("AES256")
with ThreadedEchoServer(CERTFILE, with ThreadedEchoServer(context=server_context) as server:
ssl_version=ssl.PROTOCOL_TLS, with client_context.wrap_socket(socket.socket(),
chatty=False) as server: server_hostname=hostname) as s:
with context.wrap_socket(socket.socket()) as s:
with self.assertRaises(OSError): with self.assertRaises(OSError):
s.connect((HOST, server.port)) s.connect((HOST, server.port))
self.assertIn("no shared cipher", server.conn_errors[0]) self.assertIn("no shared cipher", server.conn_errors[0])
...@@ -3496,7 +3544,7 @@ class ThreadedTests(unittest.TestCase): ...@@ -3496,7 +3544,7 @@ class ThreadedTests(unittest.TestCase):
self.assertIs(s.version(), None) self.assertIs(s.version(), None)
self.assertIs(s._sslobj, None) self.assertIs(s._sslobj, None)
s.connect((HOST, server.port)) s.connect((HOST, server.port))
if ssl.OPENSSL_VERSION_INFO >= (1, 1, 1): if IS_OPENSSL_1_1_1 and ssl.HAS_TLSv1_3:
self.assertEqual(s.version(), 'TLSv1.3') self.assertEqual(s.version(), 'TLSv1.3')
elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2): elif ssl.OPENSSL_VERSION_INFO >= (1, 0, 2):
self.assertEqual(s.version(), 'TLSv1.2') self.assertEqual(s.version(), 'TLSv1.2')
...@@ -3517,9 +3565,9 @@ class ThreadedTests(unittest.TestCase): ...@@ -3517,9 +3565,9 @@ class ThreadedTests(unittest.TestCase):
with context.wrap_socket(socket.socket()) as s: with context.wrap_socket(socket.socket()) as s:
s.connect((HOST, server.port)) s.connect((HOST, server.port))
self.assertIn(s.cipher()[0], { self.assertIn(s.cipher()[0], {
'TLS13-AES-256-GCM-SHA384', 'TLS_AES_256_GCM_SHA384',
'TLS13-CHACHA20-POLY1305-SHA256', 'TLS_CHACHA20_POLY1305_SHA256',
'TLS13-AES-128-GCM-SHA256', 'TLS_AES_128_GCM_SHA256',
}) })
self.assertEqual(s.version(), 'TLSv1.3') self.assertEqual(s.version(), 'TLSv1.3')
...@@ -3605,8 +3653,6 @@ class ThreadedTests(unittest.TestCase): ...@@ -3605,8 +3653,6 @@ class ThreadedTests(unittest.TestCase):
sys.stdout.write("\n") sys.stdout.write("\n")
client_context, server_context, hostname = testing_context() client_context, server_context, hostname = testing_context()
# TODO: fix TLSv1.3 support
client_context.options |= ssl.OP_NO_TLSv1_3
server = ThreadedEchoServer(context=server_context, server = ThreadedEchoServer(context=server_context,
chatty=True, chatty=True,
...@@ -3625,7 +3671,10 @@ class ThreadedTests(unittest.TestCase): ...@@ -3625,7 +3671,10 @@ class ThreadedTests(unittest.TestCase):
# check if it is sane # check if it is sane
self.assertIsNotNone(cb_data) self.assertIsNotNone(cb_data)
self.assertEqual(len(cb_data), 12) # True for TLSv1 if s.version() == 'TLSv1.3':
self.assertEqual(len(cb_data), 48)
else:
self.assertEqual(len(cb_data), 12) # True for TLSv1
# and compare with the peers version # and compare with the peers version
s.write(b"CB tls-unique\n") s.write(b"CB tls-unique\n")
...@@ -3647,7 +3696,10 @@ class ThreadedTests(unittest.TestCase): ...@@ -3647,7 +3696,10 @@ class ThreadedTests(unittest.TestCase):
# is it really unique # is it really unique
self.assertNotEqual(cb_data, new_cb_data) self.assertNotEqual(cb_data, new_cb_data)
self.assertIsNotNone(cb_data) self.assertIsNotNone(cb_data)
self.assertEqual(len(cb_data), 12) # True for TLSv1 if s.version() == 'TLSv1.3':
self.assertEqual(len(cb_data), 48)
else:
self.assertEqual(len(cb_data), 12) # True for TLSv1
s.write(b"CB tls-unique\n") s.write(b"CB tls-unique\n")
peer_data_repr = s.read().strip() peer_data_repr = s.read().strip()
self.assertEqual(peer_data_repr, self.assertEqual(peer_data_repr,
...@@ -3925,23 +3977,20 @@ class ThreadedTests(unittest.TestCase): ...@@ -3925,23 +3977,20 @@ class ThreadedTests(unittest.TestCase):
def test_shared_ciphers(self): def test_shared_ciphers(self):
client_context, server_context, hostname = testing_context() client_context, server_context, hostname = testing_context()
if ssl.OPENSSL_VERSION_INFO >= (1, 0, 2): client_context.set_ciphers("AES128:AES256")
client_context.set_ciphers("AES128:AES256") server_context.set_ciphers("AES256")
server_context.set_ciphers("AES256") expected_algs = [
alg1 = "AES256" "AES256", "AES-256",
alg2 = "AES-256" # TLS 1.3 ciphers are always enabled
else: "TLS_CHACHA20", "TLS_AES",
client_context.set_ciphers("AES:3DES") ]
server_context.set_ciphers("3DES")
alg1 = "3DES"
alg2 = "DES-CBC3"
stats = server_params_test(client_context, server_context, stats = server_params_test(client_context, server_context,
sni_name=hostname) sni_name=hostname)
ciphers = stats['server_shared_ciphers'][0] ciphers = stats['server_shared_ciphers'][0]
self.assertGreater(len(ciphers), 0) self.assertGreater(len(ciphers), 0)
for name, tls_version, bits in ciphers: for name, tls_version, bits in ciphers:
if not alg1 in name.split("-") and alg2 not in name: if not any(alg in name for alg in expected_algs):
self.fail(name) self.fail(name)
def test_read_write_after_close_raises_valuerror(self): def test_read_write_after_close_raises_valuerror(self):
......
...@@ -1617,13 +1617,29 @@ class POSIXProcessTestCase(BaseTestCase): ...@@ -1617,13 +1617,29 @@ class POSIXProcessTestCase(BaseTestCase):
self.assertIn(repr(error_data), str(e.exception)) self.assertIn(repr(error_data), str(e.exception))
@unittest.skipIf(not os.path.exists('/proc/self/status'),
"need /proc/self/status")
def test_restore_signals(self): def test_restore_signals(self):
# Code coverage for both values of restore_signals to make sure it # Blindly assume that cat exists on systems with /proc/self/status...
# at least does not blow up. default_proc_status = subprocess.check_output(
# A test for behavior would be complex. Contributions welcome. ['cat', '/proc/self/status'],
subprocess.call([sys.executable, "-c", ""], restore_signals=True) restore_signals=False)
subprocess.call([sys.executable, "-c", ""], restore_signals=False) for line in default_proc_status.splitlines():
if line.startswith(b'SigIgn'):
default_sig_ign_mask = line
break
else:
self.skipTest("SigIgn not found in /proc/self/status.")
restored_proc_status = subprocess.check_output(
['cat', '/proc/self/status'],
restore_signals=True)
for line in restored_proc_status.splitlines():
if line.startswith(b'SigIgn'):
restored_sig_ign_mask = line
break
self.assertNotEqual(default_sig_ign_mask, restored_sig_ign_mask,
msg="restore_signals=True should've unblocked "
"SIGPIPE and friends.")
def test_start_new_session(self): def test_start_new_session(self):
# For code coverage of calling setsid(). We don't care if we get an # For code coverage of calling setsid(). We don't care if we get an
...@@ -2174,7 +2190,6 @@ class POSIXProcessTestCase(BaseTestCase): ...@@ -2174,7 +2190,6 @@ class POSIXProcessTestCase(BaseTestCase):
to descriptor(s) {read_fds} instead of descriptor {to_fd}. to descriptor(s) {read_fds} instead of descriptor {to_fd}.
""") """)
self.assertEqual([to_fd], read_fds, msg) self.assertEqual([to_fd], read_fds, msg)
finally: finally:
self._restore_fds(saved_fds) self._restore_fds(saved_fds)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment