Commit 514f9a9e authored by Jason Madden's avatar Jason Madden

Update pypy tests and some exclusions for it.

parent 7da80407
......@@ -8,8 +8,7 @@ coverage>=4.0
coveralls>=1.0
cffi
futures
# Makes tests faster, but causes issues on the old
# linux version Travis CI uses. We have a workaround.
# Makes tests faster
psutil
# For viewing README.rst (restview --long-description),
# CONTRIBUTING.rst, etc.
......
-----BEGIN PRIVATE KEY-----
MIICdgIBADANBgkqhkiG9w0BAQEFAASCAmAwggJcAgEAAoGBAOoy7/QOtTjQ0niE
6uDcTwtkC0R2Tvy1AjVnXohCntZfdzbTGDoYTgXSOLsP8A697jUiJ8VCePGH50xG
Z4DKnAF3a9O3a9nr2pLXb0iY3XOMv+YEBii7CfI+3oxFYgCl0sMgHzDD2ZTVYAsm
DWgLUVsE2gHEccRwrM2tPf2EgR+FAgMBAAECgYEA3qyfyYVSeTrTYxO93x6ZaVMu
A2IZp9zSxMQL9bKiI2GRj+cV2ebSCGbg2btFnD6qBor7FWsmYz+8g6FNN/9sY4az
61rMqMtQvLBe+7L8w70FeTze4qQ4Y1oQri0qD6tBWhDVlpnbI5Py9bkZKD67yVUk
elcEA/5x4PrYXkuqsAECQQD80NjT0mDvaY0JOOaQFSEpMv6QiUA8GGX8Xli7IoKb
tAolPG8rQBa+qSpcWfDMTrWw/aWHuMEEQoP/bVDH9W4FAkEA7SYQbBAKnojZ5A3G
kOHdV7aeivRQxQk/JN8Fb8oKB9Csvpv/BsuGxPKXHdhFa6CBTTsNRtHQw/szPo4l
xMIjgQJAPoMxqibR+0EBM6+TKzteSL6oPXsCnBl4Vk/J5vPgkbmR7KUl4+7j8N8J
b2554TrxKEN/w7CGYZRE6UrRd7ATNQJAWD7Yz41sli+wfPdPU2xo1BHljyl4wMk/
EPZYbI/PCbdyAH/F935WyQTIjNeEhZc1Zkq6FwdOWw8ns3hrv3rKgQJAHXv1BqUa
czGPIFxX2TNoqtcl6/En4vrxVB1wzsfzkkDAg98kBl7qsF+S3qujSzKikjeaVbI2
/CyWR2P3yLtOmA==
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIIDcjCCAtugAwIBAgIJAN5dc9TOWjB7MA0GCSqGSIb3DQEBCwUAMF0xCzAJBgNV
BAYTAlhZMRcwFQYDVQQHDA5DYXN0bGUgQW50aHJheDEjMCEGA1UECgwaUHl0aG9u
IFNvZnR3YXJlIEZvdW5kYXRpb24xEDAOBgNVBAMMB2FsbHNhbnMwHhcNMTYwODA1
MTAyMTExWhcNMjYwODAzMTAyMTExWjBdMQswCQYDVQQGEwJYWTEXMBUGA1UEBwwO
Q2FzdGxlIEFudGhyYXgxIzAhBgNVBAoMGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0
aW9uMRAwDgYDVQQDDAdhbGxzYW5zMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKB
gQDqMu/0DrU40NJ4hOrg3E8LZAtEdk78tQI1Z16IQp7WX3c20xg6GE4F0ji7D/AO
ve41IifFQnjxh+dMRmeAypwBd2vTt2vZ69qS129ImN1zjL/mBAYouwnyPt6MRWIA
pdLDIB8ww9mU1WALJg1oC1FbBNoBxHHEcKzNrT39hIEfhQIDAQABo4IBODCCATQw
ggEwBgNVHREEggEnMIIBI4IHYWxsc2Fuc6AeBgMqAwSgFwwVc29tZSBvdGhlciBp
ZGVudGlmaWVyoDUGBisGAQUCAqArMCmgEBsOS0VSQkVST1MuUkVBTE2hFTAToAMC
AQGhDDAKGwh1c2VybmFtZYEQdXNlckBleGFtcGxlLm9yZ4IPd3d3LmV4YW1wbGUu
b3JnpGcwZTELMAkGA1UEBhMCWFkxFzAVBgNVBAcMDkNhc3RsZSBBbnRocmF4MSMw
IQYDVQQKDBpQeXRob24gU29mdHdhcmUgRm91bmRhdGlvbjEYMBYGA1UEAwwPZGly
bmFtZSBleGFtcGxlhhdodHRwczovL3d3dy5weXRob24ub3JnL4cEfwAAAYcQAAAA
AAAAAAAAAAAAAAAAAYgEKgMEBTANBgkqhkiG9w0BAQsFAAOBgQAy16h+F+nOmeiT
VWR0fc8F/j6FcadbLseAUaogcC15OGxCl4UYpLV88HBkABOoGCpP155qwWTwOrdG
iYPGJSusf1OnJEbvzFejZf6u078bPd9/ZL4VWLjv+FPGkjd+N+/OaqMvgj8Lu99f
3Y/C4S7YbHxxwff6C6l2Xli+q6gnuQ==
-----END CERTIFICATE-----
-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBANcLaMB7T/Wi9DBc
PltGzgt8cxsv55m7PQPHMZvn6Ke8xmNqcmEzib8opRwKGrCV6TltKeFlNSg8dwQK
Tl4ktyTkGCVweRQJ37AkBayvEBml5s+QD4vlhqkJPsL/Nsd+fnqngOGc5+59+C6r
s3XpiLlF5ah/z8q92Mnw54nypw1JAgMBAAECgYBE3t2Mj7GbDLZB6rj5yKJioVfI
BD6bSJEQ7bGgqdQkLFwpKMU7BiN+ekjuwvmrRkesYZ7BFgXBPiQrwhU5J28Tpj5B
EOMYSIOHfzdalhxDGM1q2oK9LDFiCotTaSdEzMYadel5rmKXJ0zcK2Jho0PCuECf
tf/ghRxK+h1Hm0tKgQJBAO6MdGDSmGKYX6/5kPDje7we/lSLorSDkYmV0tmVShsc
JxgaGaapazceA/sHL3Myx7Eenkip+yPYDXEDFvAKNDECQQDmxsT9NOp6mo7ISvky
GFr2vVHsJ745BMWoma4rFjPBVnS8RkgK+b2EpDCdZSrQ9zw2r8sKTgrEyrDiGTEg
wJyZAkA8OOc0flYMJg2aHnYR6kwVjPmGHI5h5gk648EMPx0rROs1sXkiUwkHLCOz
HvhCq+Iv+9vX2lnVjbiu/CmxRdIxAkA1YEfzoKeTD+hyXxTgB04Sv5sRGegfXAEz
i8gC4zG5R/vcCA1lrHmvEiLEZL/QcT6WD3bQvVg0SAU9ZkI8pxARAkA7yqMSvP1l
gJXy44R+rzpLYb1/PtiLkIkaKG3x9TUfPnfD2jY09fPkZlfsRU3/uS09IkhSwimV
d5rWoljEfdou
-----END PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICXTCCAcagAwIBAgIJALVQzebTtrXFMA0GCSqGSIb3DQEBBQUAMGIxCzAJBgNV
BAYTAlhZMRcwFQYDVQQHDA5DYXN0bGUgQW50aHJheDEjMCEGA1UECgwaUHl0aG9u
IFNvZnR3YXJlIEZvdW5kYXRpb24xFTATBgNVBAMMDGZha2Vob3N0bmFtZTAeFw0x
NDExMjMxNzAwMDdaFw0yNDExMjAxNzAwMDdaMGIxCzAJBgNVBAYTAlhZMRcwFQYD
VQQHDA5DYXN0bGUgQW50aHJheDEjMCEGA1UECgwaUHl0aG9uIFNvZnR3YXJlIEZv
dW5kYXRpb24xFTATBgNVBAMMDGZha2Vob3N0bmFtZTCBnzANBgkqhkiG9w0BAQEF
AAOBjQAwgYkCgYEA1wtowHtP9aL0MFw+W0bOC3xzGy/nmbs9A8cxm+fop7zGY2py
YTOJvyilHAoasJXpOW0p4WU1KDx3BApOXiS3JOQYJXB5FAnfsCQFrK8QGaXmz5AP
i+WGqQk+wv82x35+eqeA4Zzn7n34LquzdemIuUXlqH/Pyr3YyfDnifKnDUkCAwEA
AaMbMBkwFwYDVR0RBBAwDoIMZmFrZWhvc3RuYW1lMA0GCSqGSIb3DQEBBQUAA4GB
AKuay3vDKfWzt5+ch/HHBsert84ISot4fUjzXDA/oOgTOEjVcSShHxqNShMOW1oA
QYBpBB/5Kx5RkD/w6imhucxt2WQPRgjX4x4bwMipVH/HvFDp03mG51/Cpi1TyZ74
El7qa/Pd4lHhOLzMKBA6503fpeYSFUIBxZbGLqylqRK7
-----END CERTIFICATE-----
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
......@@ -29,9 +29,9 @@ class SelectTestCase(unittest.TestCase):
self.assertIsNot(w, x)
def test_select(self):
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 0.1; done'
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
p = os.popen(cmd, 'r')
for tout in (0, 0.1, 0.2, 0.4, 0.8, 1.6) + (None,)*10:
for tout in (0, 1, 2, 4, 8, 16) + (None,)*10:
if test_support.verbose:
print 'timeout =', tout
rfd, wfd, xfd = select.select([p], [], [], tout)
......
......@@ -179,31 +179,31 @@ class DebuggingServerTests(unittest.TestCase):
def testBasic(self):
# connect
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.quit()
def testNOOP(self):
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
expected = (250, 'Ok')
self.assertEqual(smtp.noop(), expected)
smtp.quit()
def testRSET(self):
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
expected = (250, 'Ok')
self.assertEqual(smtp.rset(), expected)
smtp.quit()
def testNotImplemented(self):
# EHLO isn't implemented in DebuggingServer
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
expected = (502, 'Error: command "EHLO" not implemented')
self.assertEqual(smtp.ehlo(), expected)
smtp.quit()
def testVRFY(self):
# VRFY isn't implemented in DebuggingServer
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
expected = (502, 'Error: command "VRFY" not implemented')
self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
......@@ -212,21 +212,21 @@ class DebuggingServerTests(unittest.TestCase):
def testSecondHELO(self):
# check that a second HELO returns a message that it's a duplicate
# (this behavior is specific to smtpd.SMTPChannel)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.helo()
expected = (503, 'Duplicate HELO/EHLO')
self.assertEqual(smtp.helo(), expected)
smtp.quit()
def testHELP(self):
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
smtp.quit()
def testSend(self):
# connect and send mail
m = 'A test message'
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.sendmail('John', 'Sally', m)
# XXX(nnorwitz): this test is flaky and dies with a bad file descriptor
# in asyncore. This sleep might help, but should really be fixed
......@@ -292,6 +292,33 @@ class BadHELOServerTests(unittest.TestCase):
HOST, self.port, 'localhost', 3)
@unittest.skipUnless(threading, 'Threading required for this test.')
class TooLongLineTests(unittest.TestCase):
respdata = '250 OK' + ('.' * smtplib._MAXLINE * 2) + '\n'
def setUp(self):
self.old_stdout = sys.stdout
self.output = StringIO.StringIO()
sys.stdout = self.output
self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(15)
self.port = test_support.bind_port(self.sock)
servargs = (self.evt, self.respdata, self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
def tearDown(self):
self.evt.wait()
sys.stdout = self.old_stdout
def testLineTooLong(self):
self.assertRaises(smtplib.SMTPResponseException, smtplib.SMTP,
HOST, self.port, 'localhost', 3)
sim_users = {'Mr.A@somewhere.com':'John A',
'Ms.B@somewhere.com':'Sally B',
'Mrs.C@somewhereesle.com':'Ruth C',
......@@ -507,11 +534,27 @@ class SMTPSimTests(unittest.TestCase):
#TODO: add tests for correct AUTH method fallback now that the
#test infrastructure can support it.
def test_quit_resets_greeting(self):
smtp = smtplib.SMTP(HOST, self.port,
local_hostname='localhost',
timeout=15)
code, message = smtp.ehlo()
self.assertEqual(code, 250)
self.assertIn('size', smtp.esmtp_features)
smtp.quit()
self.assertNotIn('size', smtp.esmtp_features)
smtp.connect(HOST, self.port)
self.assertNotIn('size', smtp.esmtp_features)
smtp.ehlo_or_helo_if_needed()
self.assertIn('size', smtp.esmtp_features)
smtp.quit()
def test_main(verbose=None):
test_support.run_unittest(GeneralTests, DebuggingServerTests,
NonConnectingTests,
BadHELOServerTests, SMTPSimTests)
BadHELOServerTests, SMTPSimTests,
TooLongLineTests)
if __name__ == '__main__':
test_main()
......@@ -2,6 +2,7 @@ import unittest
from test import test_support
import errno
import itertools
import socket
import select
import time
......@@ -11,9 +12,14 @@ import sys
import os
import array
import contextlib
from weakref import proxy
import signal
import math
import weakref
try:
import _socket
except ImportError:
_socket = None
def try_address(host, port=0, family=socket.AF_INET):
"""Try to bind a socket on the given host:port and return True
......@@ -80,7 +86,7 @@ class ThreadableTest:
clientTearDown ()
Any new test functions within the class must then define
tests in pairs, where the test name is preceeded with a
tests in pairs, where the test name is preceded with a
'_' to indicate the client portion of the test. Ex:
def testFoo(self):
......@@ -243,9 +249,22 @@ class SocketPairTest(unittest.TestCase, ThreadableTest):
class GeneralModuleTests(unittest.TestCase):
@unittest.skipUnless(_socket is not None, 'need _socket module')
def test_csocket_repr(self):
s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
try:
expected = ('<socket object, fd=%s, family=%s, type=%s, protocol=%s>'
% (s.fileno(), s.family, s.type, s.proto))
self.assertEqual(repr(s), expected)
finally:
s.close()
expected = ('<socket object, fd=-1, family=%s, type=%s, protocol=%s>'
% (s.family, s.type, s.proto))
self.assertEqual(repr(s), expected)
def test_weakref(self):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
p = proxy(s)
p = weakref.proxy(s)
self.assertEqual(p.fileno(), s.fileno())
s.close()
s = None
......@@ -257,6 +276,14 @@ class GeneralModuleTests(unittest.TestCase):
else:
self.fail('Socket proxy still exists')
def test_weakref__sock(self):
s = socket.socket()._sock
w = weakref.ref(s)
self.assertIs(w(), s)
del s
test_support.gc_collect()
self.assertIsNone(w())
def testSocketError(self):
# Testing socket module exceptions
def raise_error(*args, **kwargs):
......@@ -273,7 +300,7 @@ class GeneralModuleTests(unittest.TestCase):
"Error raising socket exception.")
def testSendtoErrors(self):
# Testing that sendto doens't masks failures. See #10169.
# Testing that sendto doesn't mask failures. See #10169.
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
s.bind(('', 0))
......@@ -603,17 +630,24 @@ class GeneralModuleTests(unittest.TestCase):
sock.close()
def test_getsockaddrarg(self):
host = '0.0.0.0'
port = self._get_unused_port(bind_address=host)
sock = socket.socket()
self.addCleanup(sock.close)
port = test_support.find_unused_port()
big_port = port + 65536
neg_port = port - 65536
sock = socket.socket()
try:
self.assertRaises((OverflowError, ValueError), sock.bind, (host, big_port))
self.assertRaises((OverflowError, ValueError), sock.bind, (host, neg_port))
sock.bind((host, port))
finally:
sock.close()
self.assertRaises((OverflowError, ValueError), sock.bind, (HOST, big_port))
self.assertRaises((OverflowError, ValueError), sock.bind, (HOST, neg_port))
# Since find_unused_port() is inherently subject to race conditions, we
# call it a couple times if necessary.
for i in itertools.count():
port = test_support.find_unused_port()
try:
sock.bind((HOST, port))
except OSError as e:
if e.errno != errno.EADDRINUSE or i == 5:
raise
else:
break
@unittest.skipUnless(os.name == "nt", "Windows specific")
def test_sock_ioctl(self):
......@@ -677,7 +711,7 @@ class GeneralModuleTests(unittest.TestCase):
pass
def check_sendall_interrupted(self, with_timeout):
# socketpair() is not stricly required, but it makes things easier.
# socketpair() is not strictly required, but it makes things easier.
if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
self.skipTest("signal.alarm and socket.socketpair required for this test")
# Our signal handlers clobber the C errno by calling a math function
......@@ -706,7 +740,6 @@ class GeneralModuleTests(unittest.TestCase):
c.close()
s.close()
@unittest.skip("Needs fix in CFFI; hangs forever")
def test_sendall_interrupted(self):
self.check_sendall_interrupted(False)
......@@ -797,7 +830,7 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.sendall(big_chunk)
@unittest.skipUnless(hasattr(socket, 'fromfd'),
'socket.fromfd not availble')
'socket.fromfd not available')
def testFromFd(self):
# Testing fromfd()
fd = self.cli_conn.fileno()
......@@ -1292,15 +1325,11 @@ class NetworkConnectionNoServer(unittest.TestCase):
def mocked_socket_module(self):
"""Return a socket which times out on connect"""
old_socket = socket.socket
import gevent.socket
old_g_socket = gevent.socket.socket
socket.socket = self.MockSocket
gevent.socket.socket = self.MockSocket
try:
yield
finally:
socket.socket = old_socket
gevent.socket.socket = old_g_socket
def test_connect(self):
port = test_support.find_unused_port()
......@@ -1734,7 +1763,7 @@ class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
self.conn, self.connaddr = self.srv.accept()
def clientSetUp(self):
# The is a hittable race between serverExplicitReady() and the
# There is a hittable race between serverExplicitReady() and the
# accept() call; sleep a little while to avoid it, otherwise
# we could get an exception
time.sleep(0.1)
......
......@@ -158,6 +158,8 @@ class SocketServerTest(unittest.TestCase):
if verbose: print "waiting for server"
server.shutdown()
t.join()
server.server_close()
self.assertRaises(socket.error, server.socket.fileno)
if verbose: print "done"
def stream_examine(self, proto, addr):
......@@ -173,6 +175,8 @@ class SocketServerTest(unittest.TestCase):
def dgram_examine(self, proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
if HAVE_UNIX_SOCKETS and proto == socket.AF_UNIX:
s.bind(self.pickaddr(proto))
s.sendto(TEST_STR, addr)
buf = data = receive(s, 100)
while data and '\n' not in buf:
......@@ -267,27 +271,24 @@ class SocketServerTest(unittest.TestCase):
# Make sure select was called again:
self.assertGreater(mock_select.called, 1)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
# @requires_unix_sockets
# def test_UnixDatagramServer(self):
# self.run_server(SocketServer.UnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# @requires_unix_sockets
# def test_ThreadingUnixDatagramServer(self):
# self.run_server(SocketServer.ThreadingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# @requires_unix_sockets
# @requires_forking
# def test_ForkingUnixDatagramServer(self):
# self.run_server(SocketServer.ForkingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
@requires_unix_sockets
def test_UnixDatagramServer(self):
self.run_server(SocketServer.UnixDatagramServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@requires_unix_sockets
def test_ThreadingUnixDatagramServer(self):
self.run_server(SocketServer.ThreadingUnixDatagramServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@requires_unix_sockets
@requires_forking
def test_ForkingUnixDatagramServer(self):
self.run_server(ForkingUnixDatagramServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@reap_threads
def test_shutdown(self):
......@@ -314,6 +315,40 @@ class SocketServerTest(unittest.TestCase):
for t, s in threads:
t.join()
def test_tcpserver_bind_leak(self):
# Issue #22435: the server socket wouldn't be closed if bind()/listen()
# failed.
# Create many servers for which bind() will fail, to see if this result
# in FD exhaustion.
for i in range(1024):
with self.assertRaises(OverflowError):
SocketServer.TCPServer((HOST, -1),
SocketServer.StreamRequestHandler)
class MiscTestCase(unittest.TestCase):
def test_shutdown_request_called_if_verify_request_false(self):
# Issue #26309: BaseServer should call shutdown_request even if
# verify_request is False
class MyServer(SocketServer.TCPServer):
def verify_request(self, request, client_address):
return False
shutdown_called = 0
def shutdown_request(self, request):
self.shutdown_called += 1
SocketServer.TCPServer.shutdown_request(self, request)
server = MyServer((HOST, 0), SocketServer.StreamRequestHandler)
s = socket.socket(server.address_family, socket.SOCK_STREAM)
s.connect(server.server_address)
s.close()
server.handle_request()
self.assertEqual(server.shutdown_called, 1)
server.server_close()
def test_main():
if imp.lock_held():
......
......@@ -26,6 +26,9 @@ ssl = support.import_module("ssl")
PROTOCOLS = sorted(ssl._PROTOCOL_NAMES)
HOST = support.HOST
IS_LIBRESSL = ssl.OPENSSL_VERSION.startswith('LibreSSL')
IS_OPENSSL_1_1 = not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0)
def data_file(*name):
return os.path.join(os.path.dirname(__file__), *name)
......@@ -57,6 +60,8 @@ CRLFILE = data_file("revocation.crl")
SIGNED_CERTFILE = data_file("keycert3.pem")
SIGNED_CERTFILE2 = data_file("keycert4.pem")
SIGNING_CA = data_file("pycacert.pem")
# cert with all kinds of subject alt names
ALLSANFILE = data_file("allsans.pem")
REMOTE_HOST = "self-signed.pythontest.net"
REMOTE_ROOT_CERT = data_file("selfsigned_pythontestdotnet.pem")
......@@ -164,7 +169,6 @@ class BasicSocketTests(unittest.TestCase):
self.assertIn(ssl.HAS_SNI, {True, False})
self.assertIn(ssl.HAS_ECDH, {True, False})
def test_random(self):
v = ssl.RAND_status()
if support.verbose:
......@@ -245,6 +249,27 @@ class BasicSocketTests(unittest.TestCase):
self.assertEqual(p['subjectAltName'], san)
def test_parse_all_sans(self):
p = ssl._ssl._test_decode_cert(ALLSANFILE)
self.assertEqual(p['subjectAltName'],
(
('DNS', 'allsans'),
('othername', '<unsupported>'),
('othername', '<unsupported>'),
('email', 'user@example.org'),
('DNS', 'www.example.org'),
('DirName',
((('countryName', 'XY'),),
(('localityName', 'Castle Anthrax'),),
(('organizationName', 'Python Software Foundation'),),
(('commonName', 'dirname example'),))),
('URI', 'https://www.python.org/'),
('IP Address', '127.0.0.1'),
('IP Address', '0:0:0:0:0:0:0:1\n'),
('Registered ID', '1.2.3.4.5')
)
)
def test_DER_to_PEM(self):
with open(CAFILE_CACERT, 'r') as f:
pem = f.read()
......@@ -281,9 +306,9 @@ class BasicSocketTests(unittest.TestCase):
self.assertGreaterEqual(status, 0)
self.assertLessEqual(status, 15)
# Version string as returned by {Open,Libre}SSL, the format might change
if "LibreSSL" in s:
self.assertTrue(s.startswith("LibreSSL {:d}.{:d}".format(major, minor)),
(s, t))
if IS_LIBRESSL:
self.assertTrue(s.startswith("LibreSSL {:d}".format(major)),
(s, t, hex(n)))
else:
self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
(s, t))
......@@ -742,15 +767,15 @@ class ContextTests(unittest.TestCase):
def test_options(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
# OP_ALL | OP_NO_SSLv2 | OP_NO_SSLv3 is the default value
self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3,
ctx.options)
default = (ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3)
if not IS_LIBRESSL and ssl.OPENSSL_VERSION_INFO >= (1, 1, 0):
default |= ssl.OP_NO_COMPRESSION
self.assertEqual(default, ctx.options)
ctx.options |= ssl.OP_NO_TLSv1
self.assertEqual(ssl.OP_ALL | ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1,
ctx.options)
self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
if can_clear_options():
ctx.options = (ctx.options & ~ssl.OP_NO_SSLv2) | ssl.OP_NO_TLSv1
self.assertEqual(ssl.OP_ALL | ssl.OP_NO_TLSv1 | ssl.OP_NO_SSLv3,
ctx.options)
ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
self.assertEqual(default, ctx.options)
ctx.options = 0
self.assertEqual(0, ctx.options)
else:
......@@ -1088,6 +1113,7 @@ class ContextTests(unittest.TestCase):
self.assertRaises(TypeError, ctx.load_default_certs, 'SERVER_AUTH')
@unittest.skipIf(sys.platform == "win32", "not-Windows specific")
@unittest.skipIf(IS_LIBRESSL, "LibreSSL doesn't support env vars")
def test_load_default_certs_env(self):
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
with support.EnvironmentVarGuard() as env:
......@@ -1534,7 +1560,6 @@ class NetworkedTests(unittest.TestCase):
sys.stdout.write("%s\n" % x)
else:
self.fail("Got server certificate %s for %s:%s!" % (pem, host, port))
pem = ssl.get_server_certificate((host, port),
ca_certs=cert)
if not pem:
......@@ -2489,8 +2514,6 @@ else:
def test_asyncore_server(self):
"""Check the example asyncore integration."""
indata = "TEST MESSAGE of mixed case\n"
if support.verbose:
sys.stdout.write("\n")
......@@ -2783,7 +2806,7 @@ else:
with closing(context.wrap_socket(socket.socket())) as s:
self.assertIs(s.version(), None)
s.connect((HOST, server.port))
self.assertEqual(s.version(), "TLSv1")
self.assertEqual(s.version(), 'TLSv1')
self.assertIs(s.version(), None)
@unittest.skipUnless(ssl.HAS_ECDH, "test requires ECDH-enabled OpenSSL")
......@@ -2925,24 +2948,36 @@ else:
(['http/3.0', 'http/4.0'], None)
]
for client_protocols, expected in protocol_tests:
server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
server_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
server_context.load_cert_chain(CERTFILE)
server_context.set_alpn_protocols(server_protocols)
client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
client_context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
client_context.load_cert_chain(CERTFILE)
client_context.set_alpn_protocols(client_protocols)
stats = server_params_test(client_context, server_context,
chatty=True, connectionchatty=True)
msg = "failed trying %s (s) and %s (c).\n" \
"was expecting %s, but got %%s from the %%s" \
% (str(server_protocols), str(client_protocols),
str(expected))
client_result = stats['client_alpn_protocol']
self.assertEqual(client_result, expected, msg % (client_result, "client"))
server_result = stats['server_alpn_protocols'][-1] \
if len(stats['server_alpn_protocols']) else 'nothing'
self.assertEqual(server_result, expected, msg % (server_result, "server"))
try:
stats = server_params_test(client_context,
server_context,
chatty=True,
connectionchatty=True)
except ssl.SSLError as e:
stats = e
if expected is None and IS_OPENSSL_1_1:
# OpenSSL 1.1.0 raises handshake error
self.assertIsInstance(stats, ssl.SSLError)
else:
msg = "failed trying %s (s) and %s (c).\n" \
"was expecting %s, but got %%s from the %%s" \
% (str(server_protocols), str(client_protocols),
str(expected))
client_result = stats['client_alpn_protocol']
self.assertEqual(client_result, expected,
msg % (client_result, "client"))
server_result = stats['server_alpn_protocols'][-1] \
if len(stats['server_alpn_protocols']) else 'nothing'
self.assertEqual(server_result, expected,
msg % (server_result, "server"))
def test_selected_npn_protocol(self):
# selected_npn_protocol() is None unless NPN is used
......
......@@ -20,7 +20,6 @@ except ImportError:
threading = None
mswindows = (sys.platform == "win32")
PYPY = hasattr(sys, 'pypy_version_info')
#
# Depends on the following external programs: Python
......@@ -33,16 +32,6 @@ PYPY = hasattr(sys, 'pypy_version_info')
# SETBINARY = ''
try:
mkstemp = tempfile.mkstemp
except AttributeError:
# tempfile.mkstemp is not available
def mkstemp():
"""Replacement for mkstemp, calling mktemp."""
fname = tempfile.mktemp()
return os.open(fname, os.O_RDWR|os.O_CREAT), fname
class BaseTestCase(unittest.TestCase):
def setUp(self):
# Try to minimize the number of children we have so this test
......@@ -194,8 +183,8 @@ class ProcessTestCase(BaseTestCase):
p.wait()
self.assertEqual(p.returncode, 47)
@unittest.skipIf(sysconfig.is_python_build() or PYPY,
"need an installed Python. See #7774. Also fails to get sys.prefix on stock PyPy")
@unittest.skipIf(sysconfig.is_python_build(),
"need an installed Python. See #7774")
def test_executable_without_cwd(self):
# For a normal installation, it should work without 'cwd'
# argument. For test runs in the build directory, see #7774.
......@@ -296,6 +285,27 @@ class ProcessTestCase(BaseTestCase):
tf.seek(0)
self.assertStderrEqual(tf.read(), "strawberry")
def test_stderr_redirect_with_no_stdout_redirect(self):
# test stderr=STDOUT while stdout=None (not set)
# - grandchild prints to stderr
# - child redirects grandchild's stderr to its stdout
# - the parent should get grandchild's stderr in child's stdout
p = subprocess.Popen([sys.executable, "-c",
'import sys, subprocess;'
'rc = subprocess.call([sys.executable, "-c",'
' "import sys;"'
' "sys.stderr.write(\'42\')"],'
' stderr=subprocess.STDOUT);'
'sys.exit(rc)'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
#NOTE: stdout should get stderr from grandchild
self.assertStderrEqual(stdout, b'42')
self.assertStderrEqual(stderr, b'') # should be empty
self.assertEqual(p.returncode, 0)
def test_stdout_stderr_pipe(self):
# capture stdout and stderr to the same pipe
p = subprocess.Popen([sys.executable, "-c",
......@@ -416,7 +426,7 @@ class ProcessTestCase(BaseTestCase):
def test_communicate_pipe_fd_leak(self):
fd_directory = '/proc/%d/fd' % os.getpid()
num_fds_before_popen = len(os.listdir(fd_directory))
p = subprocess.Popen([sys.executable, "-c", "print()"],
p = subprocess.Popen([sys.executable, "-c", "print('')"],
stdout=subprocess.PIPE)
p.communicate()
num_fds_after_communicate = len(os.listdir(fd_directory))
......@@ -669,9 +679,9 @@ class ProcessTestCase(BaseTestCase):
def test_handles_closed_on_exception(self):
# If CreateProcess exits with an error, ensure the
# duplicate output handles are released
ifhandle, ifname = mkstemp()
ofhandle, ofname = mkstemp()
efhandle, efname = mkstemp()
ifhandle, ifname = tempfile.mkstemp()
ofhandle, ofname = tempfile.mkstemp()
efhandle, efname = tempfile.mkstemp()
try:
subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
stderr=efhandle)
......@@ -861,7 +871,7 @@ class POSIXProcessTestCase(BaseTestCase):
def test_args_string(self):
# args is a string
f, fname = mkstemp()
f, fname = tempfile.mkstemp()
os.write(f, "#!/bin/sh\n")
os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
sys.executable)
......@@ -905,7 +915,7 @@ class POSIXProcessTestCase(BaseTestCase):
def test_call_string(self):
# call() function with string argument on UNIX
f, fname = mkstemp()
f, fname = tempfile.mkstemp()
os.write(f, "#!/bin/sh\n")
os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
sys.executable)
......@@ -1061,7 +1071,7 @@ class POSIXProcessTestCase(BaseTestCase):
def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
# open up some temporary files
temps = [mkstemp() for i in range(3)]
temps = [tempfile.mkstemp() for i in range(3)]
temp_fds = [fd for fd, fname in temps]
try:
# unlink the files -- we won't need to reopen them
......@@ -1384,7 +1394,7 @@ class CommandsWithSpaces (BaseTestCase):
def setUp(self):
super(CommandsWithSpaces, self).setUp()
f, fname = mkstemp(".py", "te st")
f, fname = tempfile.mkstemp(".py", "te st")
self.fname = fname.lower ()
os.write(f, b"import sys;"
b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
......
......@@ -248,8 +248,8 @@ class ReadTests(TestCase):
func = getattr(telnet, func_name)
self.assertRaises(EOFError, func)
# read_eager and read_very_eager make the same gaurantees
# (they behave differently but we only test the gaurantees)
# read_eager and read_very_eager make the same guarantees
# (they behave differently but we only test the guarantees)
def test_read_very_eager_A(self):
self._test_read_any_eager_A('read_very_eager')
def test_read_very_eager_B(self):
......
......@@ -234,7 +234,12 @@ class TestForkInThread(unittest.TestCase):
if pid == 0: # child
os.close(self.read_fd)
os.write(self.write_fd, "OK")
sys.exit(0)
# Exiting the thread normally in the child process can leave
# any additional threads (such as the one started by
# importing _tkinter) still running, and this can prevent
# the half-zombie child process from being cleaned up. See
# Issue #26456.
os._exit(0)
else: # parent
os.close(self.write_fd)
......
......@@ -51,7 +51,7 @@ class TestThread(threading.Thread):
self.nrunning.inc()
if verbose:
print self.nrunning.get(), 'tasks are running'
self.testcase.assertTrue(self.nrunning.get() <= 3)
self.testcase.assertLessEqual(self.nrunning.get(), 3)
time.sleep(delay)
if verbose:
......@@ -59,7 +59,7 @@ class TestThread(threading.Thread):
with self.mutex:
self.nrunning.dec()
self.testcase.assertTrue(self.nrunning.get() >= 0)
self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
if verbose:
print '%s is finished. %d tasks are running' % (
self.name, self.nrunning.get())
......@@ -92,25 +92,25 @@ class ThreadTests(BaseTestCase):
for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
threads.append(t)
self.assertEqual(t.ident, None)
self.assertTrue(re.match('<TestThread\(.*, initial\)>', repr(t)))
self.assertIsNone(t.ident)
self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$')
t.start()
if verbose:
print 'waiting for all tasks to complete'
for t in threads:
t.join(NUMTASKS)
self.assertTrue(not t.is_alive())
self.assertFalse(t.is_alive())
self.assertNotEqual(t.ident, 0)
self.assertFalse(t.ident is None)
self.assertTrue(re.match('<TestThread\(.*, \w+ -?\d+\)>', repr(t)))
self.assertIsNotNone(t.ident)
self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$')
if verbose:
print 'all tasks done'
self.assertEqual(numrunning.get(), 0)
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
self.assertIsNotNone(threading.currentThread().ident)
def f():
ident.append(threading.currentThread().ident)
done.set()
......@@ -118,7 +118,7 @@ class ThreadTests(BaseTestCase):
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
self.assertIsNotNone(ident[0])
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
......@@ -237,7 +237,7 @@ class ThreadTests(BaseTestCase):
self.assertTrue(ret)
if verbose:
print " verifying worker hasn't exited"
self.assertTrue(not t.finished)
self.assertFalse(t.finished)
if verbose:
print " attempting to raise asynch exception in worker"
result = set_async_exc(ctypes.c_long(t.id), exception)
......@@ -706,52 +706,6 @@ class ThreadJoinOnShutdown(BaseTestCase):
output = "end of worker thread\nend of main thread\n"
self.assertScriptHasOutput(script, output)
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
@unittest.skipIf(sys.pypy_version_info[:2] >= (2,6),
"This test was removed in CPython 2.7.9, and fails under PyPy 2.6 "
"with 'RuntimeError: stream lock is not held' in random_io")
def test_6_daemon_threads(self):
# Check that a daemon thread cannot crash the interpreter on shutdown
# by manipulating internal structures that are being disposed of in
# the main thread.
script = """if True:
import os
import random
import sys
import time
import threading
thread_has_run = set()
def random_io():
'''Loop for a while sleeping random tiny amounts and doing some I/O.'''
while True:
in_f = open(os.__file__, 'rb')
stuff = in_f.read(200)
null_f = open(os.devnull, 'wb')
null_f.write(stuff)
time.sleep(random.random() / 1995)
null_f.close()
in_f.close()
thread_has_run.add(threading.current_thread())
def main():
count = 0
for _ in range(40):
new_thread = threading.Thread(target=random_io)
new_thread.daemon = True
new_thread.start()
count += 1
while len(thread_has_run) < count:
time.sleep(0.001)
# Trigger process shutdown
sys.exit(0)
main()
"""
rc, out, err = assert_python_ok('-c', script)
self.assertFalse(err)
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_reinit_tls_after_fork(self):
......@@ -791,7 +745,7 @@ class ThreadJoinOnShutdown(BaseTestCase):
def generator():
while 1:
yield "genereator"
yield "generator"
def callback():
if callback.gen is None:
......@@ -838,6 +792,85 @@ class ThreadingExceptionTests(BaseTestCase):
thread.start()
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
def test_print_exception(self):
script = r"""if 1:
import threading
import time
running = False
def run():
global running
running = True
while running:
time.sleep(0.01)
1.0/0.0
t = threading.Thread(target=run)
t.start()
while not running:
time.sleep(0.01)
running = False
t.join()
"""
rc, out, err = assert_python_ok("-c", script)
self.assertEqual(out, '')
self.assertIn("Exception in thread", err)
self.assertIn("Traceback (most recent call last):", err)
self.assertIn("ZeroDivisionError", err)
self.assertNotIn("Unhandled exception", err)
def test_print_exception_stderr_is_none_1(self):
script = r"""if 1:
import sys
import threading
import time
running = False
def run():
global running
running = True
while running:
time.sleep(0.01)
1.0/0.0
t = threading.Thread(target=run)
t.start()
while not running:
time.sleep(0.01)
sys.stderr = None
running = False
t.join()
"""
rc, out, err = assert_python_ok("-c", script)
self.assertEqual(out, '')
self.assertIn("Exception in thread", err)
self.assertIn("Traceback (most recent call last):", err)
self.assertIn("ZeroDivisionError", err)
self.assertNotIn("Unhandled exception", err)
def test_print_exception_stderr_is_none_2(self):
script = r"""if 1:
import sys
import threading
import time
running = False
def run():
global running
running = True
while running:
time.sleep(0.01)
1.0/0.0
sys.stderr = None
t = threading.Thread(target=run)
t.start()
while not running:
time.sleep(0.01)
running = False
t.join()
"""
rc, out, err = assert_python_ok("-c", script)
self.assertEqual(out, '')
self.assertNotIn("Unhandled exception", err)
class LockTests(lock_tests.LockTests):
locktype = staticmethod(threading.Lock)
......@@ -849,7 +882,7 @@ class EventTests(lock_tests.EventTests):
eventtype = staticmethod(threading.Event)
class ConditionAsRLockTests(lock_tests.RLockTests):
# An Condition uses an RLock by default and exports its API.
# Condition uses an RLock by default and exports its API.
locktype = staticmethod(threading.Condition)
class ConditionTests(lock_tests.ConditionTests):
......
import unittest
from doctest import DocTestSuite
from test import test_support
from test import test_support as support
import weakref
import gc
# Modules under test
_thread = test_support.import_module('thread')
threading = test_support.import_module('threading')
_thread = support.import_module('thread')
threading = support.import_module('threading')
import _threading_local
......@@ -35,7 +35,6 @@ class BaseLocalTest:
del t
gc.collect()
gc.collect() # needed when run through gevent; why?
self.assertEqual(len(weaklist), n)
# XXX _threading_local keeps the local of the last stopped thread alive.
......@@ -64,14 +63,9 @@ class BaseLocalTest:
# Simply check that the variable is correctly set
self.assertEqual(local.x, i)
threads= []
for i in range(10):
t = threading.Thread(target=f, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
with support.start_threads(threading.Thread(target=f, args=(i,))
for i in range(10)):
pass
def test_derived_cycle_dealloc(self):
# http://bugs.python.org/issue6990
......@@ -174,7 +168,7 @@ class BaseLocalTest:
obj = cls()
obj.x = 5
self.assertEqual(obj.__dict__, {'x': 5})
if test_support.check_impl_detail():
if support.check_impl_detail():
with self.assertRaises(AttributeError):
obj.__dict__ = {}
with self.assertRaises(AttributeError):
......@@ -203,7 +197,7 @@ class ThreadLocalTest(unittest.TestCase, BaseLocalTest):
wr = weakref.ref(x)
del x
gc.collect()
self.assertIs(wr(), None)
self.assertIsNone(wr())
class PyThreadingLocalTest(unittest.TestCase, BaseLocalTest):
_local = _threading_local.local
......@@ -230,7 +224,7 @@ def test_main():
setUp=setUp, tearDown=tearDown)
)
test_support.run_unittest(suite)
support.run_unittest(suite)
if __name__ == '__main__':
test_main()
This diff is collapsed.
import unittest
from test import test_support
from test import test_urllib
import os
import socket
import StringIO
import urllib2
from urllib2 import Request, OpenerDirector
from urllib2 import Request, OpenerDirector, AbstractDigestAuthHandler
import httplib
try:
import ssl
except ImportError:
ssl = None
# XXX
# Request
......@@ -20,7 +27,7 @@ class TrivialTests(unittest.TestCase):
self.assertRaises(ValueError, urllib2.urlopen, 'bogus url')
# XXX Name hacking to get this to work on Windows.
fname = os.path.abspath(urllib2.__file__).replace('\\', '/')
fname = os.path.abspath(urllib2.__file__).replace(os.sep, '/')
# And more hacking to get it to work on MacOS. This assumes
# urllib.pathname2url works, unfortunately...
......@@ -47,6 +54,14 @@ class TrivialTests(unittest.TestCase):
for string, list in tests:
self.assertEqual(urllib2.parse_http_list(string), list)
@unittest.skipUnless(ssl, "ssl module required")
def test_cafile_and_context(self):
context = ssl.create_default_context()
with self.assertRaises(ValueError):
urllib2.urlopen(
"https://localhost", cafile="/nonexistent/path", context=context
)
def test_request_headers_dict():
"""
......@@ -278,8 +293,8 @@ class MockHTTPClass:
self.req_headers = []
self.data = None
self.raise_on_endheaders = False
self.sock = None
self._tunnel_headers = {}
self.sock = None # gevent: compat with 2.7.0+
def __call__(self, host, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
self.host = host
......@@ -408,7 +423,7 @@ class MockHTTPHandler(urllib2.BaseHandler):
self._count = 0
self.requests = []
def http_open(self, req):
import mimetools, httplib, copy
import mimetools, copy
from StringIO import StringIO
self.requests.append(copy.deepcopy(req))
if self._count == 0:
......@@ -1026,6 +1041,22 @@ class HandlerTests(unittest.TestCase):
fp = o.open('http://www.example.com')
self.assertEqual(fp.geturl(), redirected_url.strip())
def test_redirect_no_path(self):
# Issue 14132: Relative redirect strips original path
real_class = httplib.HTTPConnection
response1 = b"HTTP/1.1 302 Found\r\nLocation: ?query\r\n\r\n"
httplib.HTTPConnection = test_urllib.fakehttp(response1)
self.addCleanup(setattr, httplib, "HTTPConnection", real_class)
urls = iter(("/path", "/path?query"))
def request(conn, method, url, *pos, **kw):
self.assertEqual(url, next(urls))
real_class.request(conn, method, url, *pos, **kw)
# Change response for subsequent connection
conn.__class__.fakedata = b"HTTP/1.1 200 OK\r\n\r\nHello!"
httplib.HTTPConnection.request = request
fp = urllib2.urlopen("http://python.org/path")
self.assertEqual(fp.geturl(), "http://python.org/path?query")
def test_proxy(self):
o = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
......@@ -1280,6 +1311,16 @@ class MiscTests(unittest.TestCase):
else:
self.assertTrue(False)
def test_unsupported_algorithm(self):
handler = AbstractDigestAuthHandler()
with self.assertRaises(ValueError) as exc:
handler.get_algorithm_impls('invalid')
self.assertEqual(
str(exc.exception),
"Unsupported digest authentication algorithm 'invalid'"
)
class RequestTests(unittest.TestCase):
def setUp(self):
......@@ -1340,6 +1381,11 @@ class RequestTests(unittest.TestCase):
req = Request(url)
self.assertEqual(req.get_full_url(), url)
def test_private_attributes(self):
self.assertFalse(hasattr(self.get, '_Request__r_xxx'))
# Issue #6500: infinite recursion
self.assertFalse(hasattr(self.get, '_Request__r_method'))
def test_HTTPError_interface(self):
"""
Issue 13211 reveals that HTTPError didn't implement the URLError
......
import os
import base64
import urlparse
import urllib2
import BaseHTTPServer
......@@ -9,6 +11,17 @@ from test import test_support
mimetools = test_support.import_module('mimetools', deprecated=True)
threading = test_support.import_module('threading')
try:
import ssl
except ImportError:
ssl = None
here = os.path.dirname(__file__)
# Self-signed cert file for 'localhost'
CERT_localhost = os.path.join(here, 'keycert.pem')
# Self-signed cert file for 'fakehostname'
CERT_fakehostname = os.path.join(here, 'keycert2.pem')
# Loopback http server infrastructure
class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
......@@ -23,7 +36,7 @@ class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
# Set the timeout of our listening socket really low so
# that we can stop the server easily.
self.socket.settimeout(1.0)
self.socket.settimeout(0.1)
def get_request(self):
"""BaseHTTPServer method, overridden."""
......@@ -66,6 +79,46 @@ class LoopbackHttpServerThread(threading.Thread):
# Authentication infrastructure
class BasicAuthHandler(BaseHTTPServer.BaseHTTPRequestHandler):
"""Handler for performing Basic Authentication."""
# Server side values
USER = "testUser"
PASSWD = "testPass"
REALM = "Test"
USER_PASSWD = "%s:%s" % (USER, PASSWD)
ENCODED_AUTH = base64.b64encode(USER_PASSWD)
def __init__(self, *args, **kwargs):
BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *args, **kwargs)
def log_message(self, format, *args):
# Suppress the HTTP Console log output
pass
def do_HEAD(self):
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
def do_AUTHHEAD(self):
self.send_response(401)
self.send_header("WWW-Authenticate", "Basic realm=\"%s\"" % self.REALM)
self.send_header("Content-type", "text/html")
self.end_headers()
def do_GET(self):
if self.headers.getheader("Authorization") == None:
self.do_AUTHHEAD()
self.wfile.write("No Auth Header Received")
elif self.headers.getheader(
"Authorization") == "Basic " + self.ENCODED_AUTH:
self.wfile.write("It works!")
else:
# Unauthorized Request
self.do_AUTHHEAD()
class DigestAuthHandler:
"""Handler for performing digest authentication."""
......@@ -228,6 +281,45 @@ class BaseTestCase(unittest.TestCase):
test_support.threading_cleanup(*self._threads)
class BasicAuthTests(BaseTestCase):
USER = "testUser"
PASSWD = "testPass"
INCORRECT_PASSWD = "Incorrect"
REALM = "Test"
def setUp(self):
super(BasicAuthTests, self).setUp()
# With Basic Authentication
def http_server_with_basic_auth_handler(*args, **kwargs):
return BasicAuthHandler(*args, **kwargs)
self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
self.server_url = 'http://127.0.0.1:%s' % self.server.port
self.server.start()
self.server.ready.wait()
def tearDown(self):
self.server.stop()
super(BasicAuthTests, self).tearDown()
def test_basic_auth_success(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER, self.PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
try:
self.assertTrue(urllib2.urlopen(self.server_url))
except urllib2.HTTPError:
self.fail("Basic Auth Failed for url: %s" % self.server_url)
except Exception as e:
raise e
def test_basic_auth_httperror(self):
ah = urllib2.HTTPBasicAuthHandler()
ah.add_password(self.REALM, self.server_url, self.USER,
self.INCORRECT_PASSWD)
urllib2.install_opener(urllib2.build_opener(ah))
self.assertRaises(urllib2.HTTPError, urllib2.urlopen, self.server_url)
class ProxyAuthTests(BaseTestCase):
URL = "http://localhost"
......@@ -237,9 +329,18 @@ class ProxyAuthTests(BaseTestCase):
def setUp(self):
super(ProxyAuthTests, self).setUp()
# Ignore proxy bypass settings in the environment.
def restore_environ(old_environ):
os.environ.clear()
os.environ.update(old_environ)
self.addCleanup(restore_environ, os.environ.copy())
os.environ['NO_PROXY'] = ''
os.environ['no_proxy'] = ''
self.digest_auth_handler = DigestAuthHandler()
self.digest_auth_handler.set_users({self.USER: self.PASSWD})
self.digest_auth_handler.set_realm(self.REALM)
# With Digest Authentication
def create_fake_proxy_handler(*args, **kwargs):
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
......@@ -352,6 +453,19 @@ class TestUrlopen(BaseTestCase):
urllib2.install_opener(opener)
super(TestUrlopen, self).setUp()
def urlopen(self, url, data=None, **kwargs):
l = []
f = urllib2.urlopen(url, data, **kwargs)
try:
# Exercise various methods
l.extend(f.readlines(200))
l.append(f.readline())
l.append(f.read(1024))
l.append(f.read())
finally:
f.close()
return b"".join(l)
def start_server(self, responses):
handler = GetRequestHandler(responses)
......@@ -362,6 +476,16 @@ class TestUrlopen(BaseTestCase):
handler.port = port
return handler
def start_https_server(self, responses=None, **kwargs):
if not hasattr(urllib2, 'HTTPSHandler'):
self.skipTest('ssl support required')
from test.ssl_servers import make_https_server
if responses is None:
responses = [(200, [], b"we care a bit")]
handler = GetRequestHandler(responses)
server = make_https_server(self, handler_class=handler, **kwargs)
handler.port = server.port
return handler
def test_redirection(self):
expected_response = 'We got here...'
......@@ -432,6 +556,49 @@ class TestUrlopen(BaseTestCase):
finally:
self.server.stop()
def test_https(self):
handler = self.start_https_server()
context = ssl.create_default_context(cafile=CERT_localhost)
data = self.urlopen("https://localhost:%s/bizarre" % handler.port, context=context)
self.assertEqual(data, b"we care a bit")
def test_https_with_cafile(self):
handler = self.start_https_server(certfile=CERT_localhost)
# Good cert
data = self.urlopen("https://localhost:%s/bizarre" % handler.port,
cafile=CERT_localhost)
self.assertEqual(data, b"we care a bit")
# Bad cert
with self.assertRaises(urllib2.URLError):
self.urlopen("https://localhost:%s/bizarre" % handler.port,
cafile=CERT_fakehostname)
# Good cert, but mismatching hostname
handler = self.start_https_server(certfile=CERT_fakehostname)
with self.assertRaises(ssl.CertificateError):
self.urlopen("https://localhost:%s/bizarre" % handler.port,
cafile=CERT_fakehostname)
def test_https_with_cadefault(self):
handler = self.start_https_server(certfile=CERT_localhost)
# Self-signed cert should fail verification with system certificate store
with self.assertRaises(urllib2.URLError):
self.urlopen("https://localhost:%s/bizarre" % handler.port,
cadefault=True)
def test_https_sni(self):
if ssl is None:
self.skipTest("ssl module required")
if not ssl.HAS_SNI:
self.skipTest("SNI support required in OpenSSL")
sni_name = [None]
def cb_sni(ssl_sock, server_name, initial_context):
sni_name[0] = server_name
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.set_servername_callback(cb_sni)
handler = self.start_https_server(context=context, certfile=CERT_localhost)
context = ssl.create_default_context(cafile=CERT_localhost)
self.urlopen("https://localhost:%s" % handler.port, context=context)
self.assertEqual(sni_name[0], "localhost")
def test_sending_headers(self):
handler = self.start_server([(200, [], "we don't care")])
......@@ -544,7 +711,7 @@ def test_main():
# the next line.
#test_support.requires("network")
test_support.run_unittest(ProxyAuthTests, TestUrlopen)
test_support.run_unittest(BasicAuthTests, ProxyAuthTests, TestUrlopen)
if __name__ == "__main__":
test_main()
......@@ -80,11 +80,11 @@ class CloseSocketTest(unittest.TestCase):
# delve deep into response to fetch socket._socketobject
response = _urlopen_with_retry("http://www.example.com/")
abused_fileobject = response.fp
#self.assertIs(abused_fileobject.__class__, socket._fileobject) JAM gevent: disable
# self.assertIs(abused_fileobject.__class__, socket._fileobject) # gevent: disable
httpresponse = abused_fileobject._sock
self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
fileobject = httpresponse.fp
#self.assertIs(fileobject.__class__, socket._fileobject) JAM gevent: disable
# self.assertIs(fileobject.__class__, socket._fileobject) # gevent: disable
self.assertTrue(not fileobject.closed)
response.close()
......@@ -102,11 +102,9 @@ class OtherNetworkTests(unittest.TestCase):
def test_ftp(self):
urls = [
'ftp://ftp.kernel.org/pub/linux/kernel/README',
'ftp://ftp.kernel.org/pub/linux/kernel/non-existent-file',
#'ftp://ftp.kernel.org/pub/leenox/kernel/test',
'ftp://gatekeeper.research.compaq.com/pub/DEC/SRC'
'/research-reports/00README-Legal-Rules-Regs',
'ftp://ftp.debian.org/debian/README',
('ftp://ftp.debian.org/debian/non-existent-file',
None, urllib2.URLError),
]
self._test_urls(urls, self._extra_handlers())
......@@ -155,12 +153,12 @@ class OtherNetworkTests(unittest.TestCase):
## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
def test_urlwithfrag(self):
urlwith_frag = "https://docs.python.org/2/glossary.html#glossary"
urlwith_frag = "http://www.pythontest.net/index.html#frag"
with test_support.transient_internet(urlwith_frag):
req = urllib2.Request(urlwith_frag)
res = urllib2.urlopen(req)
self.assertEqual(res.geturl(),
"https://docs.python.org/2/glossary.html#glossary")
"http://www.pythontest.net/index.html#frag")
def test_fileno(self):
req = urllib2.Request("http://www.example.com")
......@@ -255,6 +253,7 @@ class TimeoutTest(unittest.TestCase):
with test_support.transient_internet(url, timeout=None):
u = _urlopen_with_retry(url)
self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
u.close()
def test_http_default_timeout(self):
self.assertIsNone(socket.getdefaulttimeout())
......@@ -266,6 +265,7 @@ class TimeoutTest(unittest.TestCase):
finally:
socket.setdefaulttimeout(None)
self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
u.close()
def test_http_no_timeout(self):
self.assertIsNone(socket.getdefaulttimeout())
......@@ -277,20 +277,23 @@ class TimeoutTest(unittest.TestCase):
finally:
socket.setdefaulttimeout(None)
self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
u.close()
def test_http_timeout(self):
url = "http://www.example.com"
with test_support.transient_internet(url):
u = _urlopen_with_retry(url, timeout=120)
self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
u.close()
FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
FTP_HOST = 'ftp://ftp.debian.org/debian/'
def test_ftp_basic(self):
self.assertIsNone(socket.getdefaulttimeout())
with test_support.transient_internet(self.FTP_HOST, timeout=None):
u = _urlopen_with_retry(self.FTP_HOST)
self.assertIsNone(u.fp.fp._sock.gettimeout())
u.close()
def test_ftp_default_timeout(self):
self.assertIsNone(socket.getdefaulttimeout())
......@@ -301,6 +304,7 @@ class TimeoutTest(unittest.TestCase):
finally:
socket.setdefaulttimeout(None)
self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
u.close()
def test_ftp_no_timeout(self):
self.assertIsNone(socket.getdefaulttimeout(),)
......@@ -311,11 +315,16 @@ class TimeoutTest(unittest.TestCase):
finally:
socket.setdefaulttimeout(None)
self.assertIsNone(u.fp.fp._sock.gettimeout())
u.close()
def test_ftp_timeout(self):
with test_support.transient_internet(self.FTP_HOST):
u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
try:
u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
except:
raise
self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
u.close()
def test_main():
......
from __future__ import nested_scopes # Backward compat for 2.1
from unittest import TestCase
from wsgiref.util import setup_testing_defaults
from wsgiref.headers import Headers
from wsgiref.handlers import BaseHandler, BaseCGIHandler
from wsgiref import util
from wsgiref.validate import validator
from wsgiref.simple_server import WSGIServer, WSGIRequestHandler, demo_app
from wsgiref.simple_server import WSGIServer, WSGIRequestHandler
from wsgiref.simple_server import make_server
from StringIO import StringIO
from SocketServer import BaseServer
import os
import re
import sys
......@@ -113,6 +113,11 @@ class IntegrationTests(TestCase):
out, err = run_amock()
self.check_hello(out)
def test_request_length(self):
out, err = run_amock(data="GET " + ("x" * 65537) + " HTTP/1.0\n\n")
self.assertEqual(out.splitlines()[0],
"HTTP/1.0 414 Request-URI Too Long")
def test_validated_hello(self):
out, err = run_amock(validator(hello_app))
# the middleware doesn't support len(), so content-length isn't there
......
......@@ -380,8 +380,9 @@ class TestCaseMetaClass(type):
# as of Dec 2015 it's almost always slower and/or has much worse timer
# resolution
CI_TIMEOUT = 10
if PY3 and PYPY:
# pypy3 is very slow right now
if (PY3 and PYPY) or (PYPY and WIN and LIBUV):
# pypy3 is very slow right now,
# as is PyPy2 on windows (which only has libuv)
CI_TIMEOUT = 15
if PYPY and WIN and LIBUV:
# slow and flaky timeouts
......@@ -423,6 +424,14 @@ class TestCase(TestCaseMetaClass("NewBase", (BaseTestCase,), {})):
if hasattr(self, 'cleanup'):
self.cleanup()
self._error = self._none
self._tearDownCloseOnTearDown()
try:
del self.close_on_teardown
except AttributeError:
pass
super(TestCase, self).tearDown()
def _tearDownCloseOnTearDown(self):
# XXX: Should probably reverse this
for x in self.close_on_teardown:
close = getattr(x, 'close', x)
......@@ -430,11 +439,7 @@ class TestCase(TestCaseMetaClass("NewBase", (BaseTestCase,), {})):
close()
except Exception:
pass
try:
del self.close_on_teardown
except AttributeError:
pass
super(TestCase, self).tearDown()
@classmethod
def setUpClass(cls):
......@@ -477,6 +482,7 @@ class TestCase(TestCaseMetaClass("NewBase", (BaseTestCase,), {})):
return splitext(basename(self.modulename))[0] + '.' + self.testcasename
_none = (None, None, None)
# (context, kind, value)
_error = _none
def expect_one_error(self):
......@@ -484,12 +490,12 @@ class TestCase(TestCaseMetaClass("NewBase", (BaseTestCase,), {})):
self._old_handle_error = gevent.get_hub().handle_error
gevent.get_hub().handle_error = self._store_error
def _store_error(self, where, type, value, tb):
def _store_error(self, where, t, value, tb):
del tb
if self._error != self._none:
gevent.get_hub().parent.throw(type, value)
gevent.get_hub().parent.throw(t, value)
else:
self._error = (where, type, value)
self._error = (where, t, value)
def peek_error(self):
return self._error
......@@ -500,18 +506,20 @@ class TestCase(TestCaseMetaClass("NewBase", (BaseTestCase,), {})):
finally:
self._error = self._none
def assert_error(self, type=None, value=None, error=None, where_type=None):
def assert_error(self, kind=None, value=None, error=None, where_type=None):
if error is None:
error = self.get_error()
if type is not None:
assert issubclass(error[1], type), error
econtext, ekind, evalue = error
if kind is not None:
self.assertIsInstance(kind, type)
assert issubclass(ekind, kind), error
if value is not None:
if isinstance(value, str):
assert str(error[2]) == value, error
self.assertEqual(str(evalue), value)
else:
assert error[2] is value, error
self.assertIs(evalue, value)
if where_type is not None:
self.assertIsInstance(error[0], where_type)
self.assertIsInstance(econtext, where_type)
return error
if RUNNING_ON_APPVEYOR:
......@@ -881,9 +889,9 @@ else:
# num_fds is unix only. Is num_handles close enough on Windows?
return 0
if RUNNING_ON_TRAVIS:
# XXX: Note: installing psutil on the travis linux vm caused failures in test__makefile_refs.
get_open_files = lsof_get_open_files
#if RUNNING_ON_TRAVIS:
# # XXX: Note: installing psutil on the travis linux vm caused failures in test__makefile_refs.
# get_open_files = lsof_get_open_files
if PYPY:
......
......@@ -15,6 +15,12 @@ import re
TRAVIS = os.environ.get("TRAVIS") == "true"
OSX = sys.platform == 'darwin'
PYPY = hasattr(sys, 'pypy_version_info')
WIN = sys.platform.startswith("win")
# XXX: Formalize this better
LIBUV = os.getenv('GEVENT_CORE_CFFI_ONLY') == 'libuv' or (PYPY and WIN)
# By default, test cases are expected to switch and emit warnings if there was none
# If a test is found in this list, it's expected not to switch.
......@@ -184,7 +190,7 @@ if 'thread' in os.getenv('GEVENT_FILE', ''):
]
if os.getenv('GEVENT_CORE_CFFI_ONLY') == 'libuv':
if LIBUV:
# epoll appears to work with these just fine in some cases;
# kqueue (at least on OS X, the only tested kqueue system)
# never does (failing with abort())
......@@ -218,6 +224,14 @@ if os.getenv('GEVENT_CORE_CFFI_ONLY') == 'libuv':
'test_selectors.PollSelectorTestCase.test_timeout',
]
if WIN and PYPY:
# From PyPy2-v5.9.0, but tested against tests from
# python 2.7.8, which do work on linux and darwin
disabled_tests += [
# appears to timeout?
'test_threading.ThreadTests.test_finalize_with_trace'
]
def _make_run_with_original(mod_name, func_name):
@contextlib.contextmanager
def with_orig():
......@@ -517,6 +531,8 @@ if hasattr(sys, 'pypy_version_info') and sys.pypy_version_info[:4] in ( # pylint
'test_threading.ThreadJoinOnShutdown.test_1_join_on_shutdown',
]
if hasattr(sys, 'pypy_version_info'):
wrapped_tests.update({
# XXX: gevent: The error that was raised by that last call
# left a socket open on the server or client. The server gets
......
......@@ -64,7 +64,7 @@ def test():
hub.wait(watcher)
now = time.time()
if now - start <= 0.0:
if now - start - DELAY <= 0.0:
# Sigh. This is especially true on PyPy.
assert WIN, ("Bad timer resolution expected on Windows, test is useless", start, now)
return
......
......@@ -6,6 +6,7 @@ import ssl
import threading
import unittest
import errno
import weakref
from greentest import TestCase
......@@ -112,6 +113,19 @@ class Test(TestCase):
self.assert_open(s, s.fileno())
return s
def _close_on_teardown(self, resource):
# Keeping raw sockets alive keeps SSL sockets
# from being closed too, at least on CPython, so we
# need to use weakrefs
if 'close_on_teardown' not in self.__dict__:
self.close_on_teardown = []
self.close_on_teardown.append(weakref.ref(resource))
return resource
def _tearDownCloseOnTearDown(self):
self.close_on_teardown = [r() for r in self.close_on_teardown if r() is not None]
super(Test, self)._tearDownCloseOnTearDown()
self._tearDownCloseOnTearDown = ()
class TestSocket(Test):
......@@ -275,9 +289,8 @@ class TestSSL(Test):
def test_makefile1(self):
s = self.make_open_socket()
fileno = s.fileno()
s = ssl.wrap_socket(s)
self._close_on_teardown(s)
fileno = s.fileno()
self.assert_open(s, fileno)
......
......@@ -3,13 +3,13 @@ import _six as six
from os import pipe
import gevent
from gevent import os
from greentest import TestCase, main
from greentest import TestCase, main, CI_TIMEOUT
from gevent import Greenlet, joinall
class TestOS_tp(TestCase):
__timeout__ = 5
__timeout__ = CI_TIMEOUT
def pipe(self):
return pipe()
......@@ -83,7 +83,7 @@ if hasattr(os, 'fork_and_watch'):
class TestForkAndWatch(TestCase):
__timeout__ = 5
__timeout__ = CI_TIMEOUT
def test_waitpid_all(self):
# Cover this specific case.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment