Commit 5f264dfb authored by Denis Bilenko's avatar Denis Bilenko

Merge pull request #508 from gevent/pypy_tests

upgrade pypy tests to 2.7.8
parents 1b06377a 7195a761
import signal, subprocess, sys
import signal, subprocess, sys, time
# On Linux this causes os.waitpid to fail with OSError as the OS has already
# reaped our child process. The wait() passing the OSError on to the caller
# and causing us to exit with an error is what we are testing against.
signal.signal(signal.SIGCHLD, signal.SIG_IGN)
subprocess.Popen([sys.executable, '-c', 'print("albatross")']).wait()
# Also ensure poll() handles an errno.ECHILD appropriately.
p = subprocess.Popen([sys.executable, '-c', 'print("albatross")'])
num_polls = 0
while p.poll() is None:
# Waiting for the process to finish.
time.sleep(0.01) # Avoid being a CPU busy loop.
num_polls += 1
if num_polls > 3000:
raise RuntimeError('poll should have returned 0 within 30 seconds')
......@@ -7,9 +7,10 @@ import sys
import time
import warnings
import errno
import struct
from test import test_support
from test.test_support import TESTFN, run_unittest, unlink
from test.test_support import TESTFN, run_unittest, unlink, HOST
from StringIO import StringIO
try:
......@@ -17,7 +18,6 @@ try:
except ImportError:
threading = None
HOST = test_support.HOST
class dummysocket:
def __init__(self):
......@@ -483,8 +483,9 @@ class TCPServer(asyncore.dispatcher):
return self.socket.getsockname()[:2]
def handle_accept(self):
sock, addr = self.accept()
self.handler(sock)
pair = self.accept()
if pair is not None:
self.handler(pair[0])
def handle_error(self):
raise
......@@ -703,6 +704,27 @@ class BaseTestAPI(unittest.TestCase):
finally:
sock.close()
@unittest.skipUnless(threading, 'Threading required for this test.')
@test_support.reap_threads
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
server = TCPServer()
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1, count=500))
t.start()
self.addCleanup(t.join)
for x in xrange(20):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except socket.error:
pass
finally:
s.close()
class TestAPI_UseSelect(BaseTestAPI):
use_poll = False
......
......@@ -15,9 +15,9 @@ try:
except ImportError:
ssl = None
from unittest import TestCase
from unittest import TestCase, SkipTest, skipUnless
from test import test_support
from test.test_support import HOST
from test.test_support import HOST, HOSTv6
threading = test_support.import_module('threading')
......@@ -65,6 +65,7 @@ class DummyFTPHandler(asynchat.async_chat):
self.last_received_data = ''
self.next_response = ''
self.rest = None
self.next_retr_data = RETR_DATA
self.push('220 welcome')
def collect_incoming_data(self, data):
......@@ -189,7 +190,7 @@ class DummyFTPHandler(asynchat.async_chat):
offset = int(self.rest)
else:
offset = 0
self.dtp.push(RETR_DATA[offset:])
self.dtp.push(self.next_retr_data[offset:])
self.dtp.close_when_done()
self.rest = None
......@@ -203,6 +204,11 @@ class DummyFTPHandler(asynchat.async_chat):
self.dtp.push(NLST_DATA)
self.dtp.close_when_done()
def cmd_setlongretr(self, arg):
# For testing. Next RETR will return long line.
self.next_retr_data = 'x' * int(arg)
self.push('125 setlongretr ok')
class DummyFTPServer(asyncore.dispatcher, threading.Thread):
......@@ -474,6 +480,14 @@ class TestFTPClass(TestCase):
def test_rmd(self):
self.client.rmd('foo')
def test_cwd(self):
dir = self.client.cwd('/foo')
self.assertEqual(dir, '250 cwd ok')
def test_mkd(self):
dir = self.client.mkd('/foo')
self.assertEqual(dir, '/foo')
def test_pwd(self):
dir = self.client.pwd()
self.assertEqual(dir, 'pwd ok')
......@@ -550,11 +564,33 @@ class TestFTPClass(TestCase):
# IPv4 is in use, just make sure send_epsv has not been used
self.assertEqual(self.server.handler.last_received_cmd, 'pasv')
def test_line_too_long(self):
self.assertRaises(ftplib.Error, self.client.sendcmd,
'x' * self.client.maxline * 2)
def test_retrlines_too_long(self):
self.client.sendcmd('SETLONGRETR %d' % (self.client.maxline * 2))
received = []
self.assertRaises(ftplib.Error,
self.client.retrlines, 'retr', received.append)
def test_storlines_too_long(self):
f = StringIO.StringIO('x' * self.client.maxline * 2)
self.assertRaises(ftplib.Error, self.client.storlines, 'stor', f)
@skipUnless(socket.has_ipv6, "IPv6 not enabled")
class TestIPv6Environment(TestCase):
@classmethod
def setUpClass(cls):
try:
DummyFTPServer((HOST, 0), af=socket.AF_INET6)
except socket.error:
raise SkipTest("IPv6 not enabled")
def setUp(self):
self.server = DummyFTPServer((HOST, 0), af=socket.AF_INET6)
self.server = DummyFTPServer((HOSTv6, 0), af=socket.AF_INET6)
self.server.start()
self.client = ftplib.FTP()
self.client.connect(self.server.host, self.server.port)
......@@ -587,6 +623,7 @@ class TestIPv6Environment(TestCase):
retr()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClassMixin(TestFTPClass):
"""Repeat TestFTPClass tests starting the TLS layer for both control
and data connections first.
......@@ -602,6 +639,7 @@ class TestTLS_FTPClassMixin(TestFTPClass):
self.client.prot_p()
@skipUnless(ssl, "SSL not available")
class TestTLS_FTPClass(TestCase):
"""Specific TLS_FTP class tests."""
......@@ -702,10 +740,10 @@ class TestTimeouts(TestCase):
def testTimeoutDefault(self):
# default -- use global socket timeout
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
ftp = ftplib.FTP("localhost")
ftp = ftplib.FTP(HOST)
finally:
socket.setdefaulttimeout(None)
self.assertEqual(ftp.sock.gettimeout(), 30)
......@@ -714,13 +752,13 @@ class TestTimeouts(TestCase):
def testTimeoutNone(self):
# no timeout -- do not use global socket timeout
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
ftp = ftplib.FTP("localhost", timeout=None)
ftp = ftplib.FTP(HOST, timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertTrue(ftp.sock.gettimeout() is None)
self.assertIsNone(ftp.sock.gettimeout())
self.evt.wait()
ftp.close()
......@@ -755,17 +793,9 @@ class TestTimeouts(TestCase):
def test_main():
tests = [TestFTPClass, TestTimeouts]
if socket.has_ipv6:
try:
DummyFTPServer((HOST, 0), af=socket.AF_INET6)
except socket.error:
pass
else:
tests.append(TestIPv6Environment)
if ssl is not None:
tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass])
tests = [TestFTPClass, TestTimeouts,
TestIPv6Environment,
TestTLS_FTPClassMixin, TestTLS_FTPClass]
thread_info = test_support.threading_setup()
try:
......
......@@ -13,10 +13,12 @@ from test import test_support
HOST = test_support.HOST
class FakeSocket:
def __init__(self, text, fileclass=StringIO.StringIO):
def __init__(self, text, fileclass=StringIO.StringIO, host=None, port=None):
self.text = text
self.fileclass = fileclass
self.data = ''
self.host = host
self.port = port
def sendall(self, data):
self.data += ''.join(data)
......@@ -26,6 +28,9 @@ class FakeSocket:
raise httplib.UnimplementedFileMode()
return self.fileclass(self.text)
def close(self):
pass
class EPipeSocket(FakeSocket):
def __init__(self, text, pipe_trigger):
......@@ -90,12 +95,40 @@ class HeaderTests(TestCase):
conn.request('POST', '/', body, headers)
self.assertEqual(conn._buffer.count[header.lower()], 1)
def test_content_length_0(self):
class ContentLengthChecker(list):
def __init__(self):
list.__init__(self)
self.content_length = None
def append(self, item):
kv = item.split(':', 1)
if len(kv) > 1 and kv[0].lower() == 'content-length':
self.content_length = kv[1].strip()
list.append(self, item)
# POST with empty body
conn = httplib.HTTPConnection('example.com')
conn.sock = FakeSocket(None)
conn._buffer = ContentLengthChecker()
conn.request('POST', '/', '')
self.assertEqual(conn._buffer.content_length, '0',
'Header Content-Length not set')
# PUT request with empty body
conn = httplib.HTTPConnection('example.com')
conn.sock = FakeSocket(None)
conn._buffer = ContentLengthChecker()
conn.request('PUT', '/', '')
self.assertEqual(conn._buffer.content_length, '0',
'Header Content-Length not set')
def test_putheader(self):
conn = httplib.HTTPConnection('example.com')
conn.sock = FakeSocket(None)
conn.putrequest('GET','/')
conn.putheader('Content-length',42)
self.assertTrue('Content-length: 42' in conn._buffer)
self.assertIn('Content-length: 42', conn._buffer)
def test_ipv6host_header(self):
# Default host header on IPv6 transaction should wrapped by [] if
......@@ -125,6 +158,8 @@ class BasicTest(TestCase):
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
resp.begin()
self.assertEqual(resp.read(0), '') # Issue #20007
self.assertFalse(resp.isclosed())
self.assertEqual(resp.read(), 'Text')
self.assertTrue(resp.isclosed())
......@@ -138,7 +173,7 @@ class BasicTest(TestCase):
self.assertEqual(repr(exc), '''BadStatusLine("\'\'",)''')
def test_partial_reads(self):
# if we have a lenght, the system knows when to close itself
# if we have a length, the system knows when to close itself
# same behaviour than when we read the whole thing with read()
body = "HTTP/1.1 200 Ok\r\nContent-Length: 4\r\n\r\nText"
sock = FakeSocket(body)
......@@ -149,6 +184,32 @@ class BasicTest(TestCase):
self.assertEqual(resp.read(2), 'xt')
self.assertTrue(resp.isclosed())
def test_partial_reads_no_content_length(self):
# when no length is present, the socket should be gracefully closed when
# all data was read
body = "HTTP/1.1 200 Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
resp.begin()
self.assertEqual(resp.read(2), 'Te')
self.assertFalse(resp.isclosed())
self.assertEqual(resp.read(2), 'xt')
self.assertEqual(resp.read(1), '')
self.assertTrue(resp.isclosed())
def test_partial_reads_incomplete_body(self):
# if the server shuts down the connection before the whole
# content-length is delivered, the socket is gracefully closed
body = "HTTP/1.1 200 Ok\r\nContent-Length: 10\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
resp.begin()
self.assertEqual(resp.read(2), 'Te')
self.assertFalse(resp.isclosed())
self.assertEqual(resp.read(2), 'xt')
self.assertEqual(resp.read(1), '')
self.assertTrue(resp.isclosed())
def test_host_port(self):
# Check invalid host_port
......@@ -279,7 +340,7 @@ class BasicTest(TestCase):
resp = httplib.HTTPResponse(sock, method="GET")
resp.begin()
self.assertEqual(resp.read(), 'Hello\r\n')
resp.close()
self.assertTrue(resp.isclosed())
def test_incomplete_read(self):
sock = FakeSocket('HTTP/1.1 200 OK\r\nContent-Length: 10\r\n\r\nHello\r\n')
......@@ -293,10 +354,9 @@ class BasicTest(TestCase):
"IncompleteRead(7 bytes read, 3 more expected)")
self.assertEqual(str(i),
"IncompleteRead(7 bytes read, 3 more expected)")
self.assertTrue(resp.isclosed())
else:
self.fail('IncompleteRead expected')
finally:
resp.close()
def test_epipe(self):
sock = EPipeSocket(
......@@ -349,6 +409,14 @@ class BasicTest(TestCase):
resp.begin()
self.assertRaises(httplib.LineTooLong, resp.read)
def test_early_eof(self):
# Test httpresponse with no \r\n termination,
body = "HTTP/1.1 200 Ok"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock)
resp.begin()
self.assertEqual(resp.read(), '')
self.assertTrue(resp.isclosed())
class OfflineTest(TestCase):
def test_responses(self):
......@@ -403,7 +471,7 @@ class TimeoutTest(TestCase):
HTTPConnection and into the socket.
'''
# default -- use global socket timeout
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT)
......@@ -414,7 +482,7 @@ class TimeoutTest(TestCase):
httpConn.close()
# no timeout -- do not use global socket default
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
httpConn = httplib.HTTPConnection(HOST, TimeoutTest.PORT,
......@@ -463,9 +531,48 @@ class HTTPSTimeoutTest(TestCase):
self.fail("Port incorrectly parsed: %s != %s" % (p, c.host))
class TunnelTests(TestCase):
def test_connect(self):
response_text = (
'HTTP/1.0 200 OK\r\n\r\n' # Reply to CONNECT
'HTTP/1.1 200 OK\r\n' # Reply to HEAD
'Content-Length: 42\r\n\r\n'
)
def create_connection(address, timeout=None, source_address=None):
return FakeSocket(response_text, host=address[0], port=address[1])
conn = httplib.HTTPConnection('proxy.com')
conn._create_connection = create_connection
# Once connected, we should not be able to tunnel anymore
conn.connect()
self.assertRaises(RuntimeError, conn.set_tunnel, 'destination.com')
# But if close the connection, we are good.
conn.close()
conn.set_tunnel('destination.com')
conn.request('HEAD', '/', '')
self.assertEqual(conn.sock.host, 'proxy.com')
self.assertEqual(conn.sock.port, 80)
self.assertTrue('CONNECT destination.com' in conn.sock.data)
self.assertTrue('Host: destination.com' in conn.sock.data)
self.assertTrue('Host: proxy.com' not in conn.sock.data)
conn.close()
conn.request('PUT', '/', '')
self.assertEqual(conn.sock.host, 'proxy.com')
self.assertEqual(conn.sock.port, 80)
self.assertTrue('CONNECT destination.com' in conn.sock.data)
self.assertTrue('Host: destination.com' in conn.sock.data)
def test_main(verbose=None):
test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
HTTPSTimeoutTest, SourceAddressTest)
HTTPSTimeoutTest, SourceAddressTest, TunnelTests)
if __name__ == '__main__':
test_main()
......@@ -4,11 +4,6 @@ Written by Cody A.W. Somerville <cody-somerville@ubuntu.com>,
Josip Dzolonga, and Michael Otteneder for the 2007/08 GHOP contest.
"""
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from CGIHTTPServer import CGIHTTPRequestHandler
import CGIHTTPServer
import os
import sys
import re
......@@ -17,12 +12,17 @@ import shutil
import urllib
import httplib
import tempfile
import unittest
import CGIHTTPServer
from StringIO import StringIO
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
from SimpleHTTPServer import SimpleHTTPRequestHandler
from CGIHTTPServer import CGIHTTPRequestHandler
from StringIO import StringIO
from test import test_support
threading = test_support.import_module('threading')
......@@ -43,7 +43,7 @@ class SocketlessRequestHandler(SimpleHTTPRequestHandler):
self.end_headers()
self.wfile.write(b'<html><body>Data</body></html>\r\n')
def log_message(self, format, *args):
def log_message(self, fmt, *args):
pass
......@@ -97,9 +97,9 @@ class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
self.handler = SocketlessRequestHandler()
def send_typical_request(self, message):
input = StringIO(message)
input_msg = StringIO(message)
output = StringIO()
self.handler.rfile = input
self.handler.rfile = input_msg
self.handler.wfile = output
self.handler.handle_one_request()
output.seek(0)
......@@ -114,7 +114,7 @@ class BaseHTTPRequestHandlerTestCase(unittest.TestCase):
def verify_http_server_response(self, response):
match = self.HTTPResponseMatch.search(response)
self.assertTrue(match is not None)
self.assertIsNotNone(match)
def test_http_1_1(self):
result = self.send_typical_request('GET / HTTP/1.1\r\n\r\n')
......@@ -189,7 +189,7 @@ class BaseHTTPServerTestCase(BaseTestCase):
def test_request_line_trimming(self):
self.con._http_vsn_str = 'HTTP/1.1\n'
self.con.putrequest('GET', '/')
self.con.putrequest('XYZBOGUS', '/')
self.con.endheaders()
res = self.con.getresponse()
self.assertEqual(res.status, 501)
......@@ -216,8 +216,9 @@ class BaseHTTPServerTestCase(BaseTestCase):
self.assertEqual(res.status, 501)
def test_version_none(self):
# Test that a valid method is rejected when not HTTP/1.x
self.con._http_vsn_str = ''
self.con.putrequest('PUT', '/')
self.con.putrequest('CUSTOM', '/')
self.con.endheaders()
res = self.con.getresponse()
self.assertEqual(res.status, 400)
......@@ -296,7 +297,7 @@ class SimpleHTTPServerTestCase(BaseTestCase):
os.chdir(self.cwd)
try:
shutil.rmtree(self.tempdir)
except:
except OSError:
pass
finally:
BaseTestCase.tearDown(self)
......@@ -313,6 +314,9 @@ class SimpleHTTPServerTestCase(BaseTestCase):
#constructs the path relative to the root directory of the HTTPServer
response = self.request(self.tempdir_name + '/test')
self.check_status_and_reason(response, 200, data=self.data)
# check for trailing "/" which should return 404. See Issue17324
response = self.request(self.tempdir_name + '/test/')
self.check_status_and_reason(response, 404)
response = self.request(self.tempdir_name + '/')
self.check_status_and_reason(response, 200)
response = self.request(self.tempdir_name)
......@@ -321,17 +325,16 @@ class SimpleHTTPServerTestCase(BaseTestCase):
self.check_status_and_reason(response, 404)
response = self.request('/' + 'ThisDoesNotExist' + '/')
self.check_status_and_reason(response, 404)
f = open(os.path.join(self.tempdir_name, 'index.html'), 'w')
response = self.request('/' + self.tempdir_name + '/')
self.check_status_and_reason(response, 200)
# chmod() doesn't work as expected on Windows, and filesystem
# permissions are ignored by root on Unix.
if os.name == 'posix' and os.geteuid() != 0:
os.chmod(self.tempdir, 0)
response = self.request(self.tempdir_name + '/')
self.check_status_and_reason(response, 404)
os.chmod(self.tempdir, 0755)
with open(os.path.join(self.tempdir_name, 'index.html'), 'w') as fp:
response = self.request('/' + self.tempdir_name + '/')
self.check_status_and_reason(response, 200)
# chmod() doesn't work as expected on Windows, and filesystem
# permissions are ignored by root on Unix.
if os.name == 'posix' and os.geteuid() != 0:
os.chmod(self.tempdir, 0)
response = self.request(self.tempdir_name + '/')
self.check_status_and_reason(response, 404)
os.chmod(self.tempdir, 0755)
def test_head(self):
response = self.request(
......@@ -393,6 +396,11 @@ class CGIHTTPServerTestCase(BaseTestCase):
else:
self.pythonexe = sys.executable
self.nocgi_path = os.path.join(self.parent_dir, 'nocgi.py')
with open(self.nocgi_path, 'w') as fp:
fp.write(cgi_file1 % self.pythonexe)
os.chmod(self.nocgi_path, 0777)
self.file1_path = os.path.join(self.cgi_dir, 'file1.py')
with open(self.file1_path, 'w') as file1:
file1.write(cgi_file1 % self.pythonexe)
......@@ -411,6 +419,7 @@ class CGIHTTPServerTestCase(BaseTestCase):
os.chdir(self.cwd)
if self.pythonexe != sys.executable:
os.remove(self.pythonexe)
os.remove(self.nocgi_path)
os.remove(self.file1_path)
os.remove(self.file2_path)
os.rmdir(self.cgi_dir)
......@@ -418,41 +427,44 @@ class CGIHTTPServerTestCase(BaseTestCase):
finally:
BaseTestCase.tearDown(self)
def test_url_collapse_path_split(self):
def test_url_collapse_path(self):
# verify tail is the last portion and head is the rest on proper urls
test_vectors = {
'': ('/', ''),
'': '//',
'..': IndexError,
'/.//..': IndexError,
'/': ('/', ''),
'//': ('/', ''),
'/\\': ('/', '\\'),
'/.//': ('/', ''),
'cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
'/cgi-bin/file1.py': ('/cgi-bin', 'file1.py'),
'a': ('/', 'a'),
'/a': ('/', 'a'),
'//a': ('/', 'a'),
'./a': ('/', 'a'),
'./C:/': ('/C:', ''),
'/a/b': ('/a', 'b'),
'/a/b/': ('/a/b', ''),
'/a/b/c/..': ('/a/b', ''),
'/a/b/c/../d': ('/a/b', 'd'),
'/a/b/c/../d/e/../f': ('/a/b/d', 'f'),
'/a/b/c/../d/e/../../f': ('/a/b', 'f'),
'/a/b/c/../d/e/.././././..//f': ('/a/b', 'f'),
'/': '//',
'//': '//',
'/\\': '//\\',
'/.//': '//',
'cgi-bin/file1.py': '/cgi-bin/file1.py',
'/cgi-bin/file1.py': '/cgi-bin/file1.py',
'a': '//a',
'/a': '//a',
'//a': '//a',
'./a': '//a',
'./C:/': '/C:/',
'/a/b': '/a/b',
'/a/b/': '/a/b/',
'/a/b/.': '/a/b/',
'/a/b/c/..': '/a/b/',
'/a/b/c/../d': '/a/b/d',
'/a/b/c/../d/e/../f': '/a/b/d/f',
'/a/b/c/../d/e/../../f': '/a/b/f',
'/a/b/c/../d/e/.././././..//f': '/a/b/f',
'../a/b/c/../d/e/.././././..//f': IndexError,
'/a/b/c/../d/e/../../../f': ('/a', 'f'),
'/a/b/c/../d/e/../../../../f': ('/', 'f'),
'/a/b/c/../d/e/../../../f': '/a/f',
'/a/b/c/../d/e/../../../../f': '//f',
'/a/b/c/../d/e/../../../../../f': IndexError,
'/a/b/c/../d/e/../../../../f/..': ('/', ''),
'/a/b/c/../d/e/../../../../f/..': '//',
'/a/b/c/../d/e/../../../../f/../.': '//',
}
for path, expected in test_vectors.iteritems():
if isinstance(expected, type) and issubclass(expected, Exception):
self.assertRaises(expected,
CGIHTTPServer._url_collapse_path_split, path)
CGIHTTPServer._url_collapse_path, path)
else:
actual = CGIHTTPServer._url_collapse_path_split(path)
actual = CGIHTTPServer._url_collapse_path(path)
self.assertEqual(expected, actual,
msg='path = %r\nGot: %r\nWanted: %r' %
(path, actual, expected))
......@@ -462,6 +474,10 @@ class CGIHTTPServerTestCase(BaseTestCase):
self.assertEqual(('Hello World\n', 'text/html', 200),
(res.read(), res.getheader('Content-type'), res.status))
def test_issue19435(self):
res = self.request('///////////nocgi.py/../cgi-bin/nothere.sh')
self.assertEqual(res.status, 404)
def test_post(self):
params = urllib.urlencode({'spam' : 1, 'eggs' : 'python', 'bacon' : 123456})
headers = {'Content-type' : 'application/x-www-form-urlencoded'}
......@@ -495,6 +511,11 @@ class CGIHTTPServerTestCase(BaseTestCase):
(res.read(), res.getheader('Content-type'), res.status))
self.assertEqual(os.environ['SERVER_SOFTWARE'], signature)
def test_urlquote_decoding_in_cgi_check(self):
res = self.request('/cgi-bin%2ffile1.py')
self.assertEqual((b'Hello World\n', 'text/html', 200),
(res.read(), res.getheader('Content-type'), res.status))
class SimpleHTTPRequestHandlerTestCase(unittest.TestCase):
""" Test url parsing """
......
......@@ -43,6 +43,9 @@ class _TriggerThread(threading.Thread):
class BlockingTestMixin:
def tearDown(self):
self.t = None
def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
self.t = _TriggerThread(trigger_func, trigger_args)
self.t.start()
......@@ -79,7 +82,7 @@ class BlockingTestMixin:
self.fail("trigger thread ended but event never set")
class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
class BaseQueueTest(BlockingTestMixin):
def setUp(self):
self.cum = 0
self.cumlock = threading.Lock()
......@@ -191,13 +194,13 @@ class BaseQueueTest(unittest.TestCase, BlockingTestMixin):
self.simple_queue_test(q)
class QueueTest(BaseQueueTest):
class QueueTest(BaseQueueTest, unittest.TestCase):
type2test = Queue.Queue
class LifoQueueTest(BaseQueueTest):
class LifoQueueTest(BaseQueueTest, unittest.TestCase):
type2test = Queue.LifoQueue
class PriorityQueueTest(BaseQueueTest):
class PriorityQueueTest(BaseQueueTest, unittest.TestCase):
type2test = Queue.PriorityQueue
......@@ -222,7 +225,7 @@ class FailingQueue(Queue.Queue):
raise FailingQueueException, "You Lose"
return Queue.Queue._get(self)
class FailingQueueTest(unittest.TestCase, BlockingTestMixin):
class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
def failing_queue_test(self, q):
if not q.empty():
......
......@@ -49,6 +49,15 @@ class SelectTestCase(unittest.TestCase):
self.fail('Unexpected return values from select():', rfd, wfd, xfd)
p.close()
# Issue 16230: Crash on select resized list
def test_select_mutated(self):
a = []
class F:
def fileno(self):
del a[-1]
return sys.__stdout__.fileno()
a[:] = [F()] * 10
self.assertEqual(select.select([], a, []), ([], a[:5], []))
def test_main():
test_support.run_unittest(SelectTestCase)
......
......@@ -109,7 +109,7 @@ class InterProcessSignalTests(unittest.TestCase):
# This wait should be interrupted by the signal's exception.
self.wait(child)
time.sleep(1) # Give the signal time to be delivered.
self.fail('HandlerBCalled exception not thrown')
self.fail('HandlerBCalled exception not raised')
except HandlerBCalled:
self.assertTrue(self.b_called)
self.assertFalse(self.a_called)
......@@ -148,7 +148,7 @@ class InterProcessSignalTests(unittest.TestCase):
# test-running process from all the signals. It then
# communicates with that child process over a pipe and
# re-raises information about any exceptions the child
# throws. The real work happens in self.run_test().
# raises. The real work happens in self.run_test().
os_done_r, os_done_w = os.pipe()
with closing(os.fdopen(os_done_r)) as done_r, \
closing(os.fdopen(os_done_w, 'w')) as done_w:
......@@ -227,6 +227,13 @@ class WindowsSignalTests(unittest.TestCase):
signal.signal(7, handler)
class WakeupFDTests(unittest.TestCase):
def test_invalid_fd(self):
fd = test_support.make_bad_fd()
self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class WakeupSignalTests(unittest.TestCase):
TIMEOUT_FULL = 10
......@@ -485,8 +492,9 @@ class ItimerTest(unittest.TestCase):
def test_main():
test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
WakeupSignalTests, SiginterruptTest,
ItimerTest, WindowsSignalTests)
WakeupFDTests, WakeupSignalTests,
SiginterruptTest, ItimerTest,
WindowsSignalTests)
if __name__ == "__main__":
......
......@@ -77,7 +77,7 @@ class GeneralTests(unittest.TestCase):
smtp.close()
def testTimeoutDefault(self):
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
smtp = smtplib.SMTP(HOST, self.port)
......@@ -87,13 +87,13 @@ class GeneralTests(unittest.TestCase):
smtp.close()
def testTimeoutNone(self):
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
socket.setdefaulttimeout(30)
try:
smtp = smtplib.SMTP(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertTrue(smtp.sock.gettimeout() is None)
self.assertIsNone(smtp.sock.gettimeout())
smtp.close()
def testTimeoutValue(self):
......
#!/usr/bin/env python
import unittest
from test import test_support
......@@ -333,28 +331,29 @@ class GeneralModuleTests(unittest.TestCase):
ip = socket.gethostbyname(hostname)
except socket.error:
# Probably name lookup wasn't set up right; skip this test
return
self.skipTest('name lookup failure')
self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
try:
hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
except socket.error:
# Probably a similar problem as above; skip this test
return
self.skipTest('address lookup failure')
all_host_names = [hostname, hname] + aliases
fqhn = socket.getfqdn(ip)
if not fqhn in all_host_names:
self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
@unittest.skipUnless(hasattr(sys, 'getrefcount'),
'test needs sys.getrefcount()')
def testRefCountGetNameInfo(self):
# Testing reference count for getnameinfo
if hasattr(sys, "getrefcount"):
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except TypeError:
self.assertEqual(sys.getrefcount(__name__), orig,
"socket.getnameinfo loses a reference")
try:
# On some versions, this loses a reference
orig = sys.getrefcount(__name__)
socket.getnameinfo(__name__,0)
except TypeError:
self.assertEqual(sys.getrefcount(__name__), orig,
"socket.getnameinfo loses a reference")
def testInterpreterCrash(self):
# Making sure getnameinfo doesn't crash the interpreter
......@@ -417,7 +416,7 @@ class GeneralModuleTests(unittest.TestCase):
# Try same call with optional protocol omitted
port2 = socket.getservbyname(service)
eq(port, port2)
# Try udp, but don't barf it it doesn't exist
# Try udp, but don't barf if it doesn't exist
try:
udpport = socket.getservbyname(service, 'udp')
except socket.error:
......@@ -461,17 +460,17 @@ class GeneralModuleTests(unittest.TestCase):
# Check that setting it to an invalid type raises TypeError
self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
@unittest.skipUnless(hasattr(socket, 'inet_aton'),
'test needs socket.inet_aton()')
def testIPv4_inet_aton_fourbytes(self):
if not hasattr(socket, 'inet_aton'):
return # No inet_aton, nothing to check
# Test that issue1008086 and issue767150 are fixed.
# It must return 4 bytes.
self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv4toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
from socket import inet_aton as f, inet_pton, AF_INET
g = lambda a: inet_pton(AF_INET, a)
......@@ -486,15 +485,15 @@ class GeneralModuleTests(unittest.TestCase):
self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
@unittest.skipUnless(hasattr(socket, 'inet_pton'),
'test needs socket.inet_pton()')
def testIPv6toString(self):
if not hasattr(socket, 'inet_pton'):
return # No inet_pton() on this platform
try:
from socket import inet_pton, AF_INET6, has_ipv6
if not has_ipv6:
return
self.skipTest('IPv6 not available')
except ImportError:
return
self.skipTest('could not import needed symbols from socket')
f = lambda a: inet_pton(AF_INET6, a)
self.assertEqual('\x00' * 16, f('::'))
......@@ -505,9 +504,9 @@ class GeneralModuleTests(unittest.TestCase):
f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
)
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv4(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
from socket import inet_ntoa as f, inet_ntop, AF_INET
g = lambda a: inet_ntop(AF_INET, a)
......@@ -520,15 +519,15 @@ class GeneralModuleTests(unittest.TestCase):
self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
@unittest.skipUnless(hasattr(socket, 'inet_ntop'),
'test needs socket.inet_ntop()')
def testStringToIPv6(self):
if not hasattr(socket, 'inet_ntop'):
return # No inet_ntop() on this platform
try:
from socket import inet_ntop, AF_INET6, has_ipv6
if not has_ipv6:
return
self.skipTest('IPv6 not available')
except ImportError:
return
self.skipTest('could not import needed symbols from socket')
f = lambda a: inet_ntop(AF_INET6, a)
self.assertEqual('::', f('\x00' * 16))
......@@ -568,7 +567,7 @@ class GeneralModuleTests(unittest.TestCase):
my_ip_addr = socket.gethostbyname(socket.gethostname())
except socket.error:
# Probably name lookup wasn't set up right; skip this test
return
self.skipTest('name lookup failure')
self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
self.assertEqual(name[1], port)
......@@ -647,9 +646,10 @@ class GeneralModuleTests(unittest.TestCase):
if SUPPORTS_IPV6:
socket.getaddrinfo('::1', 80)
# port can be a string service name such as "http", a numeric
# port number or None
# port number (int or long), or None
socket.getaddrinfo(HOST, "http")
socket.getaddrinfo(HOST, 80)
socket.getaddrinfo(HOST, 80L)
socket.getaddrinfo(HOST, None)
# test family and socktype filters
infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
......@@ -666,6 +666,15 @@ class GeneralModuleTests(unittest.TestCase):
socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
socket.AI_PASSIVE)
# Issue 17269: test workaround for OS X platform bug segfault
if hasattr(socket, 'AI_NUMERICSERV'):
try:
# The arguments here are undefined and the call may succeed
# or fail. All we care here is that it doesn't segfault.
socket.getaddrinfo("localhost", None, 0, 0, 0,
socket.AI_NUMERICSERV)
except socket.gaierror:
pass
def check_sendall_interrupted(self, with_timeout):
# socketpair() is not stricly required, but it makes things easier.
......@@ -686,11 +695,12 @@ class GeneralModuleTests(unittest.TestCase):
c.settimeout(1.5)
with self.assertRaises(ZeroDivisionError):
signal.alarm(1)
c.sendall(b"x" * (1024**2))
c.sendall(b"x" * test_support.SOCK_MAX_SIZE)
if with_timeout:
signal.signal(signal.SIGALRM, ok_handler)
signal.alarm(1)
self.assertRaises(socket.timeout, c.sendall, b"x" * (1024**2))
self.assertRaises(socket.timeout, c.sendall,
b"x" * test_support.SOCK_MAX_SIZE)
finally:
signal.signal(signal.SIGALRM, old_alarm)
c.close()
......@@ -702,11 +712,20 @@ class GeneralModuleTests(unittest.TestCase):
def test_sendall_interrupted_with_timeout(self):
self.check_sendall_interrupted(True)
def testListenBacklog0(self):
def test_listen_backlog(self):
for backlog in 0, -1:
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.bind((HOST, 0))
srv.listen(backlog)
srv.close()
@test_support.cpython_only
def test_listen_backlog_overflow(self):
# Issue 15989
import _testcapi
srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
srv.bind((HOST, 0))
# backlog = 0
srv.listen(0)
self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
srv.close()
@unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
......@@ -776,10 +795,10 @@ class BasicTCPTest(SocketConnectedTest):
big_chunk = 'f' * 2048
self.serv_conn.sendall(big_chunk)
@unittest.skipUnless(hasattr(socket, 'fromfd'),
'socket.fromfd not availble')
def testFromFd(self):
# Testing fromfd()
if not hasattr(socket, "fromfd"):
return # On Windows, this doesn't exist
fd = self.cli_conn.fileno()
sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
self.addCleanup(sock.close)
......@@ -812,6 +831,19 @@ class BasicTCPTest(SocketConnectedTest):
self.serv_conn.send(MSG)
self.serv_conn.shutdown(2)
testShutdown_overflow = test_support.cpython_only(testShutdown)
@test_support.cpython_only
def _testShutdown_overflow(self):
import _testcapi
self.serv_conn.send(MSG)
# Issue 15989
self.assertRaises(OverflowError, self.serv_conn.shutdown,
_testcapi.INT_MAX + 1)
self.assertRaises(OverflowError, self.serv_conn.shutdown,
2 + (_testcapi.UINT_MAX + 1))
self.serv_conn.shutdown(2)
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicUDPTest(ThreadedUDPSocketTest):
......@@ -857,6 +889,8 @@ class TCPCloserTest(ThreadedTCPSocketTest):
self.cli.connect((HOST, self.port))
time.sleep(1.0)
@unittest.skipUnless(hasattr(socket, 'socketpair'),
'test needs socket.socketpair()')
@unittest.skipUnless(thread, 'Threading required for this test.')
class BasicSocketPairTest(SocketPairTest):
......@@ -885,7 +919,10 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def testSetBlocking(self):
# Testing whether set blocking works
self.serv.setblocking(0)
self.serv.setblocking(True)
self.assertIsNone(self.serv.gettimeout())
self.serv.setblocking(False)
self.assertEqual(self.serv.gettimeout(), 0.0)
start = time.time()
try:
self.serv.accept()
......@@ -897,6 +934,19 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testSetBlocking(self):
pass
@test_support.cpython_only
def testSetBlocking_overflow(self):
# Issue 15989
import _testcapi
if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
self.skipTest('needs UINT_MAX < ULONG_MAX')
self.serv.setblocking(False)
self.assertEqual(self.serv.gettimeout(), 0.0)
self.serv.setblocking(_testcapi.UINT_MAX + 1)
self.assertIsNone(self.serv.gettimeout())
_testSetBlocking_overflow = test_support.cpython_only(_testSetBlocking)
def testAccept(self):
# Testing non-blocking accept
self.serv.setblocking(0)
......@@ -964,8 +1014,8 @@ class FileObjectClassTestCase(SocketConnectedTest):
def tearDown(self):
self.serv_file.close()
self.assertTrue(self.serv_file.closed)
self.serv_file = None
SocketConnectedTest.tearDown(self)
self.serv_file = None
def clientSetUp(self):
SocketConnectedTest.clientSetUp(self)
......@@ -1156,6 +1206,64 @@ class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
bufsize = 1 # Default-buffered for reading; line-buffered for writing
class SocketMemo(object):
"""A wrapper to keep track of sent data, needed to examine write behaviour"""
def __init__(self, sock):
self._sock = sock
self.sent = []
def send(self, data, flags=0):
n = self._sock.send(data, flags)
self.sent.append(data[:n])
return n
def sendall(self, data, flags=0):
self._sock.sendall(data, flags)
self.sent.append(data)
def __getattr__(self, attr):
return getattr(self._sock, attr)
def getsent(self):
return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent]
def setUp(self):
FileObjectClassTestCase.setUp(self)
self.serv_file._sock = self.SocketMemo(self.serv_file._sock)
def testLinebufferedWrite(self):
# Write two lines, in small chunks
msg = MSG.strip()
print >> self.serv_file, msg,
print >> self.serv_file, msg
# second line:
print >> self.serv_file, msg,
print >> self.serv_file, msg,
print >> self.serv_file, msg
# third line
print >> self.serv_file, ''
self.serv_file.flush()
msg1 = "%s %s\n"%(msg, msg)
msg2 = "%s %s %s\n"%(msg, msg, msg)
msg3 = "\n"
self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3])
def _testLinebufferedWrite(self):
msg = MSG.strip()
msg1 = "%s %s\n"%(msg, msg)
msg2 = "%s %s %s\n"%(msg, msg, msg)
msg3 = "\n"
l1 = self.cli_file.readline()
self.assertEqual(l1, msg1)
l2 = self.cli_file.readline()
self.assertEqual(l2, msg2)
l3 = self.cli_file.readline()
self.assertEqual(l3, msg3)
class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
......@@ -1203,7 +1311,26 @@ class NetworkConnectionNoServer(unittest.TestCase):
port = test_support.find_unused_port()
with self.assertRaises(socket.error) as cm:
socket.create_connection((HOST, port))
self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
# Issue #16257: create_connection() calls getaddrinfo() against
# 'localhost'. This may result in an IPV6 addr being returned
# as well as an IPV4 one:
# >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
# >>> [(2, 2, 0, '', ('127.0.0.1', 41230)),
# (26, 2, 0, '', ('::1', 41230, 0, 0))]
#
# create_connection() enumerates through all the addresses returned
# and if it doesn't successfully bind to any of them, it propagates
# the last exception it encountered.
#
# On Solaris, ENETUNREACH is returned in this circumstance instead
# of ECONNREFUSED. So, if that errno exists, add it to our list of
# expected errnos.
expected_errnos = [ errno.ECONNREFUSED, ]
if hasattr(errno, 'ENETUNREACH'):
expected_errnos.append(errno.ENETUNREACH)
self.assertIn(cm.exception.errno, expected_errnos)
def test_create_connection_timeout(self):
# Issue #9792: create_connection() should not recast timeout errors
......@@ -1363,12 +1490,12 @@ class TCPTimeoutTest(SocketTCPTest):
if not ok:
self.fail("accept() returned success when we did not expect it")
@unittest.skipUnless(hasattr(signal, 'alarm'),
'test needs signal.alarm()')
def testInterruptedTimeout(self):
# XXX I don't know how to do this test on MSWindows or any other
# plaform that doesn't support signal.alarm() or os.kill(), though
# the bug should have existed on all platforms.
if not hasattr(signal, "alarm"):
return # can only test on *nix
self.serv.settimeout(5.0) # must be longer than alarm
class Alarm(Exception):
pass
......@@ -1428,6 +1555,7 @@ class TestExceptions(unittest.TestCase):
self.assertTrue(issubclass(socket.gaierror, socket.error))
self.assertTrue(issubclass(socket.timeout, socket.error))
@unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
class TestLinuxAbstractNamespace(unittest.TestCase):
UNIX_PATH_MAX = 108
......@@ -1523,6 +1651,23 @@ class BufferIOTest(SocketConnectedTest):
_testRecvFromIntoMemoryview = _testRecvFromIntoArray
def testRecvFromIntoSmallBuffer(self):
# See issue #20246.
buf = bytearray(8)
self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
def _testRecvFromIntoSmallBuffer(self):
with test_support.check_py3k_warnings():
buf = buffer(MSG)
self.serv_conn.send(buf)
def testRecvFromIntoEmptyBuffer(self):
buf = bytearray()
self.cli_conn.recvfrom_into(buf)
self.cli_conn.recvfrom_into(buf, 0)
_testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
TIPC_STYPE = 2000
TIPC_LOWER = 200
......@@ -1542,11 +1687,11 @@ def isTipcAvailable():
for line in f:
if line.startswith("tipc "):
return True
if test_support.verbose:
print "TIPC module is not loaded, please 'sudo modprobe tipc'"
return False
class TIPCTest (unittest.TestCase):
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCTest(unittest.TestCase):
def testRDM(self):
srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
......@@ -1566,7 +1711,9 @@ class TIPCTest (unittest.TestCase):
self.assertEqual(msg, MSG)
class TIPCThreadableTest (unittest.TestCase, ThreadableTest):
@unittest.skipUnless(isTipcAvailable(),
"TIPC module is not loaded, please 'sudo modprobe tipc'")
class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
def __init__(self, methodName = 'runTest'):
unittest.TestCase.__init__(self, methodName = methodName)
ThreadableTest.__init__(self)
......@@ -1619,13 +1766,9 @@ def test_main():
NetworkConnectionAttributesTest,
NetworkConnectionBehaviourTest,
])
if hasattr(socket, "socketpair"):
tests.append(BasicSocketPairTest)
if sys.platform == 'linux2':
tests.append(TestLinuxAbstractNamespace)
if isTipcAvailable():
tests.append(TIPCTest)
tests.append(TIPCThreadableTest)
tests.append(BasicSocketPairTest)
tests.append(TestLinuxAbstractNamespace)
tests.extend([TIPCTest, TIPCThreadableTest])
thread_info = test_support.threading_setup()
test_support.run_unittest(*tests)
......
......@@ -8,6 +8,8 @@ import os
import select
import signal
import socket
import select
import errno
import tempfile
import unittest
import SocketServer
......@@ -25,15 +27,21 @@ TEST_STR = "hello world\n"
HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
requires_unix_sockets = unittest.skipUnless(HAVE_UNIX_SOCKETS,
'requires Unix sockets')
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
requires_forking = unittest.skipUnless(HAVE_FORKING, 'requires forking')
def signal_alarm(n):
"""Call signal.alarm when it exists (i.e. not on Windows)."""
if hasattr(signal, 'alarm'):
signal.alarm(n)
# Remember real select() to avoid interferences with mocking
_real_select = select.select
def receive(sock, n, timeout=20):
r, w, x = select.select([sock], [], [], timeout)
r, w, x = _real_select([sock], [], [], timeout)
if sock in r:
return sock.recv(n)
else:
......@@ -53,7 +61,7 @@ if HAVE_UNIX_SOCKETS:
def simple_subprocess(testcase):
pid = os.fork()
if pid == 0:
# Don't throw an exception; it would be caught by the test harness.
# Don't raise an exception; it would be caught by the test harness.
os._exit(72)
yield None
pid2, status = os.waitpid(pid, 0)
......@@ -183,31 +191,33 @@ class SocketServerTest(unittest.TestCase):
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_FORKING:
def test_ForkingTCPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingTCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_UNIX_SOCKETS:
def test_UnixStreamServer(self):
self.run_server(SocketServer.UnixStreamServer,
@requires_forking
def test_ForkingTCPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingTCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
def test_ThreadingUnixStreamServer(self):
self.run_server(SocketServer.ThreadingUnixStreamServer,
@requires_unix_sockets
def test_UnixStreamServer(self):
self.run_server(SocketServer.UnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
def test_ThreadingUnixStreamServer(self):
self.run_server(SocketServer.ThreadingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
@requires_unix_sockets
@requires_forking
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
if HAVE_FORKING:
def test_ForkingUnixStreamServer(self):
with simple_subprocess(self):
self.run_server(ForkingUnixStreamServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
def test_UDPServer(self):
self.run_server(SocketServer.UDPServer,
SocketServer.DatagramRequestHandler,
......@@ -218,32 +228,66 @@ class SocketServerTest(unittest.TestCase):
SocketServer.DatagramRequestHandler,
self.dgram_examine)
if HAVE_FORKING:
def test_ForkingUDPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingUDPServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@requires_forking
def test_ForkingUDPServer(self):
with simple_subprocess(self):
self.run_server(SocketServer.ForkingUDPServer,
SocketServer.DatagramRequestHandler,
self.dgram_examine)
@contextlib.contextmanager
def mocked_select_module(self):
"""Mocks the select.select() call to raise EINTR for first call"""
old_select = select.select
class MockSelect:
def __init__(self):
self.called = 0
def __call__(self, *args):
self.called += 1
if self.called == 1:
# raise the exception on first call
raise select.error(errno.EINTR, os.strerror(errno.EINTR))
else:
# Return real select value for consecutive calls
return old_select(*args)
select.select = MockSelect()
try:
yield select.select
finally:
select.select = old_select
def test_InterruptServerSelectCall(self):
with self.mocked_select_module() as mock_select:
pid = self.run_server(SocketServer.TCPServer,
SocketServer.StreamRequestHandler,
self.stream_examine)
# Make sure select was called again:
self.assertGreater(mock_select.called, 1)
# Alas, on Linux (at least) recvfrom() doesn't return a meaningful
# client address so this cannot work:
# if HAVE_UNIX_SOCKETS:
# def test_UnixDatagramServer(self):
# self.run_server(SocketServer.UnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
# @requires_unix_sockets
# def test_UnixDatagramServer(self):
# self.run_server(SocketServer.UnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# def test_ThreadingUnixDatagramServer(self):
# self.run_server(SocketServer.ThreadingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
# @requires_unix_sockets
# def test_ThreadingUnixDatagramServer(self):
# self.run_server(SocketServer.ThreadingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
#
# if HAVE_FORKING:
# def test_ForkingUnixDatagramServer(self):
# self.run_server(SocketServer.ForkingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
# @requires_unix_sockets
# @requires_forking
# def test_ForkingUnixDatagramServer(self):
# self.run_server(SocketServer.ForkingUnixDatagramServer,
# SocketServer.DatagramRequestHandler,
# self.dgram_examine)
@reap_threads
def test_shutdown(self):
......
......@@ -25,6 +25,7 @@ ssl = test_support.import_module("ssl")
HOST = test_support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
NULLBYTECERT = None
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
......@@ -95,12 +96,8 @@ class BasicSocketTests(unittest.TestCase):
sys.stdout.write("\n RAND_status is %d (%s)\n"
% (v, (v and "sufficient randomness") or
"insufficient randomness"))
try:
ssl.RAND_egd(1)
except TypeError:
pass
else:
print "didn't raise TypeError"
self.assertRaises(TypeError, ssl.RAND_egd, 1)
self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
ssl.RAND_add("this is a random string", 75.0)
def test_parse_cert(self):
......@@ -127,6 +124,35 @@ class BasicSocketTests(unittest.TestCase):
('DNS', 'projects.forum.nokia.com'))
)
def test_parse_cert_CVE_2013_4238(self):
p = ssl._ssl._test_decode_cert(NULLBYTECERT)
if test_support.verbose:
sys.stdout.write("\n" + pprint.pformat(p) + "\n")
subject = ((('countryName', 'US'),),
(('stateOrProvinceName', 'Oregon'),),
(('localityName', 'Beaverton'),),
(('organizationName', 'Python Software Foundation'),),
(('organizationalUnitName', 'Python Core Development'),),
(('commonName', 'null.python.org\x00example.org'),),
(('emailAddress', 'python-dev@python.org'),))
self.assertEqual(p['subject'], subject)
self.assertEqual(p['issuer'], subject)
if ssl.OPENSSL_VERSION_INFO >= (0, 9, 8):
san = (('DNS', 'altnull.python.org\x00example.com'),
('email', 'null@python.org\x00user@example.org'),
('URI', 'http://null.python.org\x00http://example.org'),
('IP Address', '192.0.2.1'),
('IP Address', '2001:DB8:0:0:0:0:0:1\n'))
else:
# OpenSSL 0.9.7 doesn't support IPv6 addresses in subjectAltName
san = (('DNS', 'altnull.python.org\x00example.com'),
('email', 'null@python.org\x00user@example.org'),
('URI', 'http://null.python.org\x00http://example.org'),
('IP Address', '192.0.2.1'),
('IP Address', '<invalid>'))
self.assertEqual(p['subjectAltName'], san)
def test_DER_to_PEM(self):
with open(SVN_PYTHON_ORG_ROOT_CERT, 'r') as f:
pem = f.read()
......@@ -166,9 +192,8 @@ class BasicSocketTests(unittest.TestCase):
self.assertTrue(s.startswith("OpenSSL {:d}.{:d}.{:d}".format(major, minor, fix)),
(s, t))
@test_support.requires_resource('network')
def test_ciphers(self):
if not test_support.is_resource_enabled('network'):
return
remote = ("svn.python.org", 443)
with test_support.transient_internet(remote[0]):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
......@@ -207,6 +232,13 @@ class BasicSocketTests(unittest.TestCase):
self.assertRaises(socket.error, ss.send, b'x')
self.assertRaises(socket.error, ss.sendto, b'x', ('0.0.0.0', 0))
def test_unsupported_dtls(self):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.addCleanup(s.close)
with self.assertRaises(NotImplementedError) as cx:
ssl.wrap_socket(s, cert_reqs=ssl.CERT_NONE)
self.assertEqual(str(cx.exception), "only stream sockets are supported")
class NetworkedTests(unittest.TestCase):
......@@ -283,6 +315,34 @@ class NetworkedTests(unittest.TestCase):
finally:
s.close()
def test_timeout_connect_ex(self):
# Issue #12065: on a timeout, connect_ex() should return the original
# errno (mimicking the behaviour of non-SSL sockets).
with test_support.transient_internet("svn.python.org"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT,
do_handshake_on_connect=False)
try:
s.settimeout(0.0000001)
rc = s.connect_ex(('svn.python.org', 443))
if rc == 0:
self.skipTest("svn.python.org responded too quickly")
self.assertIn(rc, (errno.EAGAIN, errno.EWOULDBLOCK))
finally:
s.close()
def test_connect_ex_error(self):
with test_support.transient_internet("svn.python.org"):
s = ssl.wrap_socket(socket.socket(socket.AF_INET),
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
try:
self.assertEqual(errno.ECONNREFUSED,
s.connect_ex(("svn.python.org", 444)))
finally:
s.close()
@unittest.skipIf(os.name == "nt", "Can't use a socket as a file under Windows")
def test_makefile_close(self):
# Issue #5238: creating a file-like object with makefile() shouldn't
......@@ -330,19 +390,24 @@ class NetworkedTests(unittest.TestCase):
def test_get_server_certificate(self):
with test_support.transient_internet("svn.python.org"):
pem = ssl.get_server_certificate(("svn.python.org", 443))
pem = ssl.get_server_certificate(("svn.python.org", 443),
ssl.PROTOCOL_SSLv23)
if not pem:
self.fail("No server certificate on svn.python.org:443!")
try:
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE)
pem = ssl.get_server_certificate(("svn.python.org", 443),
ssl.PROTOCOL_SSLv23,
ca_certs=CERTFILE)
except ssl.SSLError:
#should fail
pass
else:
self.fail("Got server certificate %s for svn.python.org!" % pem)
pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
pem = ssl.get_server_certificate(("svn.python.org", 443),
ssl.PROTOCOL_SSLv23,
ca_certs=SVN_PYTHON_ORG_ROOT_CERT)
if not pem:
self.fail("No server certificate on svn.python.org:443!")
if test_support.verbose:
......@@ -354,7 +419,8 @@ class NetworkedTests(unittest.TestCase):
# SHA256 was added in OpenSSL 0.9.8
if ssl.OPENSSL_VERSION_INFO < (0, 9, 8, 0, 15):
self.skipTest("SHA256 not available on %r" % ssl.OPENSSL_VERSION)
# NOTE: https://sha256.tbs-internet.com is another possible test host
self.skipTest("remote host needs SNI, only available on Python 3.2+")
# NOTE: https://sha2.hboeck.de is another possible test host
remote = ("sha256.tbs-internet.com", 443)
sha256_cert = os.path.join(os.path.dirname(__file__), "sha256.pem")
with test_support.transient_internet("sha256.tbs-internet.com"):
......@@ -993,7 +1059,7 @@ else:
try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True)
try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_OPTIONAL)
try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv2, True, ssl.CERT_REQUIRED)
try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, True)
try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv23, False)
try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_SSLv3, False)
try_protocol_combo(ssl.PROTOCOL_SSLv2, ssl.PROTOCOL_TLSv1, False)
......@@ -1337,15 +1403,21 @@ else:
def test_main(verbose=False):
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT
CERTFILE = test_support.findfile("keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = test_support.findfile(
global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT, NOKIACERT, NULLBYTECERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
os.path.dirname(__file__) or os.curdir,
"https_svn_python_org_root.pem")
NOKIACERT = test_support.findfile("nokia.pem")
NOKIACERT = os.path.join(os.path.dirname(__file__) or os.curdir,
"nokia.pem")
NULLBYTECERT = os.path.join(os.path.dirname(__file__) or os.curdir,
"nullbytecert.pem")
if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT) or
not os.path.exists(NOKIACERT)):
not os.path.exists(NOKIACERT) or
not os.path.exists(NULLBYTECERT)):
raise test_support.TestFailed("Can't read certificate files!")
tests = [BasicTests, BasicSocketTests]
......
......@@ -14,6 +14,10 @@ try:
import resource
except ImportError:
resource = None
try:
import threading
except ImportError:
threading = None
mswindows = (sys.platform == "win32")
......@@ -58,6 +62,18 @@ class BaseTestCase(unittest.TestCase):
self.assertEqual(actual, expected, msg)
class PopenTestException(Exception):
pass
class PopenExecuteChildRaises(subprocess.Popen):
"""Popen subclass for testing cleanup of subprocess.PIPE filehandles when
_execute_child fails.
"""
def _execute_child(self, *args, **kwargs):
raise PopenTestException("Forced Exception for Test")
class ProcessTestCase(BaseTestCase):
def test_call_seq(self):
......@@ -138,16 +154,27 @@ class ProcessTestCase(BaseTestCase):
self.assertEqual(p.stdin, None)
def test_stdout_none(self):
# .stdout is None when not redirected
p = subprocess.Popen([sys.executable, "-c",
'print " this bit of output is from a '
'test of stdout in a different '
'process ..."'],
stdin=subprocess.PIPE, stderr=subprocess.PIPE)
self.addCleanup(p.stdin.close)
# .stdout is None when not redirected, and the child's stdout will
# be inherited from the parent. In order to test this we run a
# subprocess in a subprocess:
# this_test
# \-- subprocess created by this test (parent)
# \-- subprocess created by the parent subprocess (child)
# The parent doesn't specify stdout, so the child will use the
# parent's stdout. This test checks that the message printed by the
# child goes to the parent stdout. The parent also checks that the
# child's stdout is None. See #11963.
code = ('import sys; from subprocess import Popen, PIPE;'
'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
' stdin=PIPE, stderr=PIPE);'
'p.wait(); assert p.stdout is None;')
p = subprocess.Popen([sys.executable, "-c", code],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
p.wait()
self.assertEqual(p.stdout, None)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err)
self.assertEqual(out.rstrip(), 'test_stdout_none')
def test_stderr_none(self):
# .stderr is None when not redirected
......@@ -296,9 +323,22 @@ class ProcessTestCase(BaseTestCase):
def test_stdout_filedes_of_stdout(self):
# stdout is set to 1 (#1531862).
cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), '.\n'))"
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
self.assertEqual(rc, 2)
# To avoid printing the text on stdout, we do something similar to
# test_stdout_none (see above). The parent subprocess calls the child
# subprocess passing stdout=1, and this test uses stdout=PIPE in
# order to capture and check the output of the parent. See #11963.
code = ('import sys, subprocess; '
'rc = subprocess.call([sys.executable, "-c", '
' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
'\'test with stdout=1\'))"], stdout=1); '
'assert rc == 18')
p = subprocess.Popen([sys.executable, "-c", code],
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
out, err = p.communicate()
self.assertEqual(p.returncode, 0, err)
self.assertEqual(out.rstrip(), 'test with stdout=1')
def test_cwd(self):
tmpdir = tempfile.gettempdir()
......@@ -528,6 +568,7 @@ class ProcessTestCase(BaseTestCase):
finally:
for h in handles:
os.close(h)
test_support.unlink(test_support.TESTFN)
def test_list2cmdline(self):
self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
......@@ -594,6 +635,36 @@ class ProcessTestCase(BaseTestCase):
if c.exception.errno not in (errno.ENOENT, errno.EACCES):
raise c.exception
@unittest.skipIf(threading is None, "threading required")
def test_double_close_on_error(self):
# Issue #18851
fds = []
def open_fds():
for i in range(20):
fds.extend(os.pipe())
time.sleep(0.001)
t = threading.Thread(target=open_fds)
t.start()
try:
with self.assertRaises(EnvironmentError):
subprocess.Popen(['nonexisting_i_hope'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
finally:
t.join()
exc = None
for fd in fds:
# If a double close occurred, some of those fds will
# already have been closed by mistake, and os.close()
# here will raise.
try:
os.close(fd)
except OSError as e:
exc = e
if exc is not None:
raise exc
def test_handles_closed_on_exception(self):
# If CreateProcess exits with an error, ensure the
# duplicate output handles are released
......@@ -633,6 +704,27 @@ class ProcessTestCase(BaseTestCase):
time.sleep(2)
p.communicate("x" * 2**20)
# This test is Linux-ish specific for simplicity to at least have
# some coverage. It is not a platform specific bug.
@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
"Linux specific")
def test_failed_child_execute_fd_leak(self):
"""Test for the fork() failure fd leak reported in issue16327."""
fd_directory = '/proc/%d/fd' % os.getpid()
fds_before_popen = os.listdir(fd_directory)
with self.assertRaises(PopenTestException):
PopenExecuteChildRaises(
[sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# NOTE: This test doesn't verify that the real _execute_child
# does not close the file descriptors itself on the way out
# during an exception. Code inspection has confirmed that.
fds_after_exception = os.listdir(fd_directory)
self.assertEqual(fds_before_popen, fds_after_exception)
# context manager
class _SuppressCoreFiles(object):
"""Try to prevent core files from being created."""
......@@ -719,6 +811,53 @@ class POSIXProcessTestCase(BaseTestCase):
self.addCleanup(p.stdout.close)
self.assertEqual(p.stdout.read(), "apple")
class _TestExecuteChildPopen(subprocess.Popen):
"""Used to test behavior at the end of _execute_child."""
def __init__(self, testcase, *args, **kwargs):
self._testcase = testcase
subprocess.Popen.__init__(self, *args, **kwargs)
def _execute_child(
self, args, executable, preexec_fn, close_fds, cwd, env,
universal_newlines, startupinfo, creationflags, shell, to_close,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite):
try:
subprocess.Popen._execute_child(
self, args, executable, preexec_fn, close_fds,
cwd, env, universal_newlines,
startupinfo, creationflags, shell, to_close,
p2cread, p2cwrite,
c2pread, c2pwrite,
errread, errwrite)
finally:
# Open a bunch of file descriptors and verify that
# none of them are the same as the ones the Popen
# instance is using for stdin/stdout/stderr.
devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
for _ in range(8)]
try:
for fd in devzero_fds:
self._testcase.assertNotIn(
fd, (p2cwrite, c2pread, errread))
finally:
for fd in devzero_fds:
os.close(fd)
@unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
def test_preexec_errpipe_does_not_double_close_pipes(self):
"""Issue16140: Don't double close pipes on preexec error."""
def raise_it():
raise RuntimeError("force the _execute_child() errpipe_data path.")
with self.assertRaises(RuntimeError):
self._TestExecuteChildPopen(
self, [sys.executable, "-c", "pass"],
stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, preexec_fn=raise_it)
def test_args_string(self):
# args is a string
f, fname = mkstemp()
......@@ -814,6 +953,29 @@ class POSIXProcessTestCase(BaseTestCase):
getattr(p, method)(*args)
return p
@unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
"Due to known OS bug (issue #16762)")
def _kill_dead_process(self, method, *args):
# Do not inherit file handles from the parent.
# It should fix failures on some platforms.
p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, time
sys.stdout.write('x\\n')
sys.stdout.flush()
"""],
close_fds=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
# Wait for the interpreter to be completely initialized before
# sending any signal.
p.stdout.read(1)
# The process should end after this
time.sleep(1)
# This shouldn't raise even though the child is now dead
getattr(p, method)(*args)
p.communicate()
def test_send_signal(self):
p = self._kill_process('send_signal', signal.SIGINT)
_, stderr = p.communicate()
......@@ -832,6 +994,18 @@ class POSIXProcessTestCase(BaseTestCase):
self.assertStderrEqual(stderr, '')
self.assertEqual(p.wait(), -signal.SIGTERM)
def test_send_signal_dead(self):
# Sending a signal to a dead process
self._kill_dead_process('send_signal', signal.SIGINT)
def test_kill_dead(self):
# Killing a dead process
self._kill_dead_process('kill')
def test_terminate_dead(self):
# Terminating a dead process
self._kill_dead_process('terminate')
def check_close_std_fds(self, fds):
# Issue #9905: test that subprocess pipes still work properly with
# some standard fds closed
......@@ -1130,6 +1304,31 @@ class Win32ProcessTestCase(BaseTestCase):
returncode = p.wait()
self.assertNotEqual(returncode, 0)
def _kill_dead_process(self, method, *args):
p = subprocess.Popen([sys.executable, "-c", """if 1:
import sys, time
sys.stdout.write('x\\n')
sys.stdout.flush()
sys.exit(42)
"""],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
self.addCleanup(p.stdout.close)
self.addCleanup(p.stderr.close)
self.addCleanup(p.stdin.close)
# Wait for the interpreter to be completely initialized before
# sending any signal.
p.stdout.read(1)
# The process should end after this
time.sleep(1)
# This shouldn't raise even though the child is now dead
getattr(p, method)(*args)
_, stderr = p.communicate()
self.assertStderrEqual(stderr, b'')
rc = p.wait()
self.assertEqual(rc, 42)
def test_send_signal(self):
self._kill_process('send_signal', signal.SIGTERM)
......@@ -1139,6 +1338,15 @@ class Win32ProcessTestCase(BaseTestCase):
def test_terminate(self):
self._kill_process('terminate')
def test_send_signal_dead(self):
self._kill_dead_process('send_signal', signal.SIGTERM)
def test_kill_dead(self):
self._kill_dead_process('kill')
def test_terminate_dead(self):
self._kill_dead_process('terminate')
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
"poll system call not supported")
......
......@@ -3,6 +3,7 @@ import telnetlib
import time
import Queue
import unittest
from unittest import TestCase
from test import test_support
threading = test_support.import_module('threading')
......@@ -91,6 +92,14 @@ class GeneralTests(TestCase):
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testGetters(self):
# Test telnet getter methods
telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
t_sock = telnet.sock
self.assertEqual(telnet.get_socket(), t_sock)
self.assertEqual(telnet.fileno(), t_sock.fileno())
telnet.sock.close()
def _read_setUp(self):
self.evt = threading.Event()
self.dataq = Queue.Queue()
......@@ -135,6 +144,28 @@ class ReadTests(TestCase):
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), 'not seen')
def test_read_until_with_poll(self):
"""Use select.poll() to implement telnet.read_until()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
if not telnet._has_poll:
raise unittest.SkipTest('select.poll() is required')
telnet._has_poll = True
self.dataq.join()
data = telnet.read_until('match')
self.assertEqual(data, ''.join(want[:-2]))
def test_read_until_with_select(self):
"""Use select.select() to implement telnet.read_until()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
telnet._has_poll = False
self.dataq.join()
data = telnet.read_until('match')
self.assertEqual(data, ''.join(want[:-2]))
def test_read_all_A(self):
"""
read_all()
......@@ -146,7 +177,6 @@ class ReadTests(TestCase):
self.dataq.join()
data = telnet.read_all()
self.assertEqual(data, ''.join(want[:-1]))
return
def _test_blocking(self, func):
self.dataq.put([self.block_long, EOF_sigil])
......@@ -357,8 +387,75 @@ class OptionTests(TestCase):
self.assertEqual('', telnet.read_sb_data())
nego.sb_getter = None # break the nego => telnet cycle
class ExpectTests(TestCase):
def setUp(self):
self.evt = threading.Event()
self.dataq = Queue.Queue()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(10)
self.port = test_support.bind_port(self.sock)
self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
self.dataq))
self.thread.start()
self.evt.wait()
def tearDown(self):
self.thread.join()
# use a similar approach to testing timeouts as test_timeout.py
# these will never pass 100% but make the fuzz big enough that it is rare
block_long = 0.6
block_short = 0.3
def test_expect_A(self):
"""
expect(expected, [timeout])
Read until the expected string has been seen, or a timeout is
hit (default is no timeout); may block.
"""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
(_,_,data) = telnet.expect(['match'])
self.assertEqual(data, ''.join(want[:-2]))
def test_expect_B(self):
# test the timeout - it does NOT raise socket.timeout
want = ['hello', self.block_long, 'not seen', EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
self.dataq.join()
(_,_,data) = telnet.expect(['not seen'], self.block_short)
self.assertEqual(data, want[0])
self.assertEqual(telnet.read_all(), 'not seen')
def test_expect_with_poll(self):
"""Use select.poll() to implement telnet.expect()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
if not telnet._has_poll:
raise unittest.SkipTest('select.poll() is required')
telnet._has_poll = True
self.dataq.join()
(_,_,data) = telnet.expect(['match'])
self.assertEqual(data, ''.join(want[:-2]))
def test_expect_with_select(self):
"""Use select.select() to implement telnet.expect()."""
want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
self.dataq.put(want)
telnet = telnetlib.Telnet(HOST, self.port)
telnet._has_poll = False
self.dataq.join()
(_,_,data) = telnet.expect(['match'])
self.assertEqual(data, ''.join(want[:-2]))
def test_main(verbose=None):
test_support.run_unittest(GeneralTests, ReadTests, OptionTests)
test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
ExpectTests)
if __name__ == '__main__':
test_main()
......@@ -7,7 +7,7 @@ import time
import sys
import weakref
import lock_tests
from test import lock_tests
NUMTASKS = 10
NUMTRIPS = 3
......@@ -70,39 +70,35 @@ class ThreadRunningTests(BasicThreadTest):
thread.stack_size(0)
self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
if os.name not in ("nt", "os2", "posix"):
return
tss_supported = True
@unittest.skipIf(os.name not in ("nt", "os2", "posix"), 'test meant for nt, os2, and posix')
def test_nt_and_posix_stack_size(self):
try:
thread.stack_size(4096)
except ValueError:
verbose_print("caught expected ValueError setting "
"stack_size(4096)")
except thread.error:
tss_supported = False
verbose_print("platform does not support changing thread stack "
"size")
if tss_supported:
fail_msg = "stack_size(%d) failed - should succeed"
for tss in (262144, 0x100000, 0):
thread.stack_size(tss)
self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
verbose_print("successfully set stack_size(%d)" % tss)
for tss in (262144, 0x100000):
verbose_print("trying stack_size = (%d)" % tss)
self.next_ident = 0
self.created = 0
for i in range(NUMTASKS):
self.newtask()
verbose_print("waiting for all tasks to complete")
self.done_mutex.acquire()
verbose_print("all tasks done")
thread.stack_size(0)
self.skipTest("platform does not support changing thread stack "
"size")
fail_msg = "stack_size(%d) failed - should succeed"
for tss in (262144, 0x100000, 0):
thread.stack_size(tss)
self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
verbose_print("successfully set stack_size(%d)" % tss)
for tss in (262144, 0x100000):
verbose_print("trying stack_size = (%d)" % tss)
self.next_ident = 0
self.created = 0
for i in range(NUMTASKS):
self.newtask()
verbose_print("waiting for all tasks to complete")
self.done_mutex.acquire()
verbose_print("all tasks done")
thread.stack_size(0)
def test__count(self):
# Test the _count() function.
......@@ -131,6 +127,29 @@ class ThreadRunningTests(BasicThreadTest):
test_support.gc_collect()
self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self):
# See issue #14474
def task():
started.release()
raise SyntaxError
def mywrite(self, *args):
try:
raise ValueError
except ValueError:
pass
real_write(self, *args)
c = thread._count()
started = thread.allocate_lock()
with test_support.captured_output("stderr") as stderr:
real_write = stderr.write
stderr.write = mywrite
started.acquire()
thread.start_new_thread(task, ())
started.acquire()
while thread._count() > c:
time.sleep(0.01)
self.assertIn("Traceback", stderr.getvalue())
class Barrier:
def __init__(self, num_threads):
......
# Very rudimentary test of threading module
import test.test_support
from test.test_support import verbose
from test.test_support import verbose, cpython_only
from test.script_helper import assert_python_ok
import random
import re
import sys
......@@ -12,8 +14,12 @@ import unittest
import weakref
import os
import subprocess
try:
import _testcapi
except ImportError:
_testcapi = None
import lock_tests
from test import lock_tests
# A trivial mutable counter.
class Counter(object):
......@@ -123,9 +129,7 @@ class ThreadTests(BaseTestCase):
try:
threading.stack_size(262144)
except thread.error:
if verbose:
print 'platform does not support changing thread stack size'
return
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
......@@ -136,9 +140,7 @@ class ThreadTests(BaseTestCase):
try:
threading.stack_size(0x100000)
except thread.error:
if verbose:
print 'platform does not support changing thread stack size'
return
self.skipTest('platform does not support changing thread stack size')
self.test_various_ops()
threading.stack_size(0)
......@@ -166,9 +168,7 @@ class ThreadTests(BaseTestCase):
try:
import ctypes
except ImportError:
if verbose:
print "test_PyThreadState_SetAsyncExc can't import ctypes"
return # can't do anything
self.skipTest('requires ctypes')
set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
......@@ -275,9 +275,7 @@ class ThreadTests(BaseTestCase):
try:
import ctypes
except ImportError:
if verbose:
print("test_finalize_with_runnning_thread can't import ctypes")
return # can't do anything
self.skipTest('requires ctypes')
rc = subprocess.call([sys.executable, "-c", """if 1:
import ctypes, sys, time, thread
......@@ -417,6 +415,73 @@ class ThreadTests(BaseTestCase):
msg=('%d references still around' %
sys.getrefcount(weak_raising_cyclic_object())))
@unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
def test_dummy_thread_after_fork(self):
# Issue #14308: a dummy thread in the active list doesn't mess up
# the after-fork mechanism.
code = """if 1:
import thread, threading, os, time
def background_thread(evt):
# Creates and registers the _DummyThread instance
threading.current_thread()
evt.set()
time.sleep(10)
evt = threading.Event()
thread.start_new_thread(background_thread, (evt,))
evt.wait()
assert threading.active_count() == 2, threading.active_count()
if os.fork() == 0:
assert threading.active_count() == 1, threading.active_count()
os._exit(0)
else:
os.wait()
"""
_, out, err = assert_python_ok("-c", code)
self.assertEqual(out, '')
self.assertEqual(err, '')
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
def test_is_alive_after_fork(self):
# Try hard to trigger #18418: is_alive() could sometimes be True on
# threads that vanished after a fork.
old_interval = sys.getcheckinterval()
# Make the bug more likely to manifest.
sys.setcheckinterval(10)
try:
for i in range(20):
t = threading.Thread(target=lambda: None)
t.start()
pid = os.fork()
if pid == 0:
os._exit(1 if t.is_alive() else 0)
else:
t.join()
pid, status = os.waitpid(pid, 0)
self.assertEqual(0, status)
finally:
sys.setcheckinterval(old_interval)
def test_BoundedSemaphore_limit(self):
# BoundedSemaphore should raise ValueError if released too often.
for limit in range(1, 10):
bs = threading.BoundedSemaphore(limit)
threads = [threading.Thread(target=bs.acquire)
for _ in range(limit)]
for t in threads:
t.start()
for t in threads:
t.join()
threads = [threading.Thread(target=bs.release)
for _ in range(limit)]
for t in threads:
t.start()
for t in threads:
t.join()
self.assertRaises(ValueError, bs.release)
class ThreadJoinOnShutdown(BaseTestCase):
......@@ -641,6 +706,49 @@ class ThreadJoinOnShutdown(BaseTestCase):
output = "end of worker thread\nend of main thread\n"
self.assertScriptHasOutput(script, output)
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_6_daemon_threads(self):
# Check that a daemon thread cannot crash the interpreter on shutdown
# by manipulating internal structures that are being disposed of in
# the main thread.
script = """if True:
import os
import random
import sys
import time
import threading
thread_has_run = set()
def random_io():
'''Loop for a while sleeping random tiny amounts and doing some I/O.'''
while True:
in_f = open(os.__file__, 'rb')
stuff = in_f.read(200)
null_f = open(os.devnull, 'wb')
null_f.write(stuff)
time.sleep(random.random() / 1995)
null_f.close()
in_f.close()
thread_has_run.add(threading.current_thread())
def main():
count = 0
for _ in range(40):
new_thread = threading.Thread(target=random_io)
new_thread.daemon = True
new_thread.start()
count += 1
while len(thread_has_run) < count:
time.sleep(0.001)
# Trigger process shutdown
sys.exit(0)
main()
"""
rc, out, err = assert_python_ok('-c', script)
self.assertFalse(err)
@unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
@unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
def test_reinit_tls_after_fork(self):
......@@ -665,6 +773,46 @@ class ThreadJoinOnShutdown(BaseTestCase):
for t in threads:
t.join()
@cpython_only
@unittest.skipIf(_testcapi is None, "need _testcapi module")
def test_frame_tstate_tracing(self):
# Issue #14432: Crash when a generator is created in a C thread that is
# destroyed while the generator is still used. The issue was that a
# generator contains a frame, and the frame kept a reference to the
# Python state of the destroyed C thread. The crash occurs when a trace
# function is setup.
def noop_trace(frame, event, arg):
# no operation
return noop_trace
def generator():
while 1:
yield "genereator"
def callback():
if callback.gen is None:
callback.gen = generator()
return next(callback.gen)
callback.gen = None
old_trace = sys.gettrace()
sys.settrace(noop_trace)
try:
# Install a trace function
threading.settrace(noop_trace)
# Create a generator in a C thread which exits after the call
_testcapi.call_in_temporary_c_thread(callback)
# Call the generator in a different Python thread, check that the
# generator didn't keep a reference to the destroyed thread state
for test in range(3):
# The trace function is still called here
callback()
finally:
sys.settrace(old_trace)
class ThreadingExceptionTests(BaseTestCase):
# A RuntimeError should be raised if Thread.start() is called
......
......@@ -178,16 +178,19 @@ class TimeoutTestCase(unittest.TestCase):
"timeout (%g) is %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
@unittest.skip('test not implemented')
def testSend(self):
# Test send() timeout
# couldn't figure out how to test it
pass
@unittest.skip('test not implemented')
def testSendto(self):
# Test sendto() timeout
# couldn't figure out how to test it
pass
@unittest.skip('test not implemented')
def testSendall(self):
# Test sendall() timeout
# couldn't figure out how to test it
......
......@@ -222,6 +222,27 @@ Content-Type: text/html; charset=iso-8859-1
finally:
self.unfakehttp()
def test_missing_localfile(self):
self.assertRaises(IOError, urllib.urlopen,
'file://localhost/a/missing/file.py')
fd, tmp_file = tempfile.mkstemp()
tmp_fileurl = 'file://localhost/' + tmp_file.replace(os.path.sep, '/')
self.assertTrue(os.path.exists(tmp_file))
try:
fp = urllib.urlopen(tmp_fileurl)
fp.close()
finally:
os.close(fd)
os.unlink(tmp_file)
self.assertFalse(os.path.exists(tmp_file))
self.assertRaises(IOError, urllib.urlopen, tmp_fileurl)
def test_ftp_nonexisting(self):
self.assertRaises(IOError, urllib.urlopen,
'ftp://localhost/not/existing/file.py')
def test_userpass_inurl(self):
self.fakehttp('Hello!')
try:
......@@ -768,6 +789,26 @@ class Utility_Tests(unittest.TestCase):
self.assertEqual(('user 2', 'ab'),urllib.splitpasswd('user 2:ab'))
self.assertEqual(('user+1', 'a+b'),urllib.splitpasswd('user+1:a+b'))
def test_splitport(self):
splitport = urllib.splitport
self.assertEqual(splitport('parrot:88'), ('parrot', '88'))
self.assertEqual(splitport('parrot'), ('parrot', None))
self.assertEqual(splitport('parrot:'), ('parrot', None))
self.assertEqual(splitport('127.0.0.1'), ('127.0.0.1', None))
self.assertEqual(splitport('parrot:cheese'), ('parrot:cheese', None))
def test_splitnport(self):
splitnport = urllib.splitnport
self.assertEqual(splitnport('parrot:88'), ('parrot', 88))
self.assertEqual(splitnport('parrot'), ('parrot', -1))
self.assertEqual(splitnport('parrot', 55), ('parrot', 55))
self.assertEqual(splitnport('parrot:'), ('parrot', -1))
self.assertEqual(splitnport('parrot:', 55), ('parrot', 55))
self.assertEqual(splitnport('127.0.0.1'), ('127.0.0.1', -1))
self.assertEqual(splitnport('127.0.0.1', 55), ('127.0.0.1', 55))
self.assertEqual(splitnport('parrot:cheese'), ('parrot', None))
self.assertEqual(splitnport('parrot:cheese', 55), ('parrot', None))
class URLopener_Tests(unittest.TestCase):
"""Testcase to test the open method of URLopener class."""
......@@ -791,7 +832,7 @@ class URLopener_Tests(unittest.TestCase):
# Everywhere else they work ok, but on those machines, sometimes
# fail in one of the tests, sometimes in other. I have a linux, and
# the tests go ok.
# If anybody has one of the problematic enviroments, please help!
# If anybody has one of the problematic environments, please help!
# . Facundo
#
# def server(evt):
......@@ -837,7 +878,7 @@ class URLopener_Tests(unittest.TestCase):
# def testTimeoutNone(self):
# # global default timeout is ignored
# import socket
# self.assertTrue(socket.getdefaulttimeout() is None)
# self.assertIsNone(socket.getdefaulttimeout())
# socket.setdefaulttimeout(30)
# try:
# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
......@@ -849,7 +890,7 @@ class URLopener_Tests(unittest.TestCase):
# def testTimeoutDefault(self):
# # global default timeout is used
# import socket
# self.assertTrue(socket.getdefaulttimeout() is None)
# self.assertIsNone(socket.getdefaulttimeout())
# socket.setdefaulttimeout(30)
# try:
# ftp = urllib.ftpwrapper("myuser", "mypass", "localhost", 9093, [])
......
......@@ -593,8 +593,8 @@ class OpenerDirectorTests(unittest.TestCase):
self.assertIsInstance(args[0], Request)
# response from opener.open is None, because there's no
# handler that defines http_open to handle it
self.assertTrue(args[1] is None or
isinstance(args[1], MockResponse))
if args[1] is not None:
self.assertIsInstance(args[1], MockResponse)
def sanepathname2url(path):
......@@ -926,7 +926,8 @@ class HandlerTests(unittest.TestCase):
MockHeaders({"location": to_url}))
except urllib2.HTTPError:
# 307 in response to POST requires user OK
self.assertTrue(code == 307 and data is not None)
self.assertEqual(code, 307)
self.assertIsNotNone(data)
self.assertEqual(o.req.get_full_url(), to_url)
try:
self.assertEqual(o.req.get_method(), "GET")
......@@ -1003,7 +1004,7 @@ class HandlerTests(unittest.TestCase):
# cookies shouldn't leak into redirected requests
from cookielib import CookieJar
from test_cookielib import interact_netscape
from test.test_cookielib import interact_netscape
cj = CookieJar()
interact_netscape(cj, "http://www.example.com/", "spam=eggs")
......@@ -1108,12 +1109,30 @@ class HandlerTests(unittest.TestCase):
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected",
)
"http://acme.example.com/protected"
)
def test_basic_auth_with_single_quoted_realm(self):
self.test_basic_auth(quote_char="'")
def test_basic_auth_with_unquoted_realm(self):
opener = OpenerDirector()
password_manager = MockPasswordManager()
auth_handler = urllib2.HTTPBasicAuthHandler(password_manager)
realm = "ACME Widget Store"
http_handler = MockHTTPHandler(
401, 'WWW-Authenticate: Basic realm=%s\r\n\r\n' % realm)
opener.add_handler(auth_handler)
opener.add_handler(http_handler)
msg = "Basic Auth Realm was unquoted"
with test_support.check_warnings((msg, UserWarning)):
self._test_basic_auth(opener, auth_handler, "Authorization",
realm, http_handler, password_manager,
"http://acme.example.com/protected",
"http://acme.example.com/protected"
)
def test_proxy_basic_auth(self):
opener = OpenerDirector()
ph = urllib2.ProxyHandler(dict(http="proxy.example.com:3128"))
......@@ -1132,7 +1151,7 @@ class HandlerTests(unittest.TestCase):
)
def test_basic_and_digest_auth_handlers(self):
# HTTPDigestAuthHandler threw an exception if it couldn't handle a 40*
# HTTPDigestAuthHandler raised an exception if it couldn't handle a 40*
# response (http://python.org/sf/1479302), where it should instead
# return None to allow another handler (especially
# HTTPBasicAuthHandler) to handle the response.
......@@ -1320,19 +1339,35 @@ class RequestTests(unittest.TestCase):
req = Request(url)
self.assertEqual(req.get_full_url(), url)
def test_HTTPError_interface():
"""
Issue 13211 reveals that HTTPError didn't implement the URLError
interface even though HTTPError is a subclass of URLError.
>>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
>>> assert hasattr(err, 'reason')
>>> err.reason
'something bad happened'
"""
def test_HTTPError_interface(self):
"""
Issue 13211 reveals that HTTPError didn't implement the URLError
interface even though HTTPError is a subclass of URLError.
>>> err = urllib2.HTTPError(msg='something bad happened', url=None, code=None, hdrs=None, fp=None)
>>> assert hasattr(err, 'reason')
>>> err.reason
'something bad happened'
"""
def test_HTTPError_interface_call(self):
"""
Issue 15701= - HTTPError interface has info method available from URLError.
"""
err = urllib2.HTTPError(msg='something bad happened', url=None,
code=None, hdrs='Content-Length:42', fp=None)
self.assertTrue(hasattr(err, 'reason'))
assert hasattr(err, 'reason')
assert hasattr(err, 'info')
assert callable(err.info)
try:
err.info()
except AttributeError:
self.fail("err.info() failed")
self.assertEqual(err.info(), "Content-Length:42")
def test_main(verbose=None):
import test_urllib2
from test import test_urllib2
test_support.run_doctest(test_urllib2, verbose)
test_support.run_doctest(urllib2, verbose)
tests = (TrivialTests,
......
#!/usr/bin/env python
import urlparse
import urllib2
import BaseHTTPServer
import unittest
import hashlib
from test import test_support
mimetools = test_support.import_module('mimetools', deprecated=True)
threading = test_support.import_module('threading')
......@@ -23,7 +23,7 @@ class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
# Set the timeout of our listening socket really low so
# that we can stop the server easily.
self.socket.settimeout(0.1)
self.socket.settimeout(1.0)
def get_request(self):
"""BaseHTTPServer method, overridden."""
......@@ -33,7 +33,7 @@ class LoopbackHttpServer(BaseHTTPServer.HTTPServer):
# It's a loopback connection, so setting the timeout
# really low shouldn't affect anything, but should make
# deadlocks less likely to occur.
request.settimeout(1.0)
request.settimeout(10.0)
return (request, client_address)
......@@ -346,6 +346,12 @@ class TestUrlopen(BaseTestCase):
for transparent redirection have been written.
"""
def setUp(self):
proxy_handler = urllib2.ProxyHandler({})
opener = urllib2.build_opener(proxy_handler)
urllib2.install_opener(opener)
super(TestUrlopen, self).setUp()
def start_server(self, responses):
handler = GetRequestHandler(responses)
......@@ -481,6 +487,11 @@ class TestUrlopen(BaseTestCase):
def test_bad_address(self):
# Make sure proper exception is raised when connecting to a bogus
# address.
# as indicated by the comment below, this might fail with some ISP,
# so we run the test only when -unetwork/-uall is specified to
# mitigate the problem a bit (see #17564)
test_support.requires('network')
self.assertRaises(IOError,
# Given that both VeriSign and various ISPs have in
# the past or are presently hijacking various invalid
......
#!/usr/bin/env python
import unittest
from test import test_support
from test_urllib2 import sanepathname2url
from test.test_urllib2 import sanepathname2url
import socket
import urllib2
......@@ -80,13 +78,13 @@ class CloseSocketTest(unittest.TestCase):
# underlying socket
# delve deep into response to fetch socket._socketobject
response = _urlopen_with_retry("http://www.python.org/")
response = _urlopen_with_retry("http://www.example.com/")
abused_fileobject = response.fp
self.assertTrue(abused_fileobject.__class__ is socket._fileobject)
self.assertIs(abused_fileobject.__class__, socket._fileobject)
httpresponse = abused_fileobject._sock
self.assertTrue(httpresponse.__class__ is httplib.HTTPResponse)
self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
fileobject = httpresponse.fp
self.assertTrue(fileobject.__class__ is socket._fileobject)
self.assertIs(fileobject.__class__, socket._fileobject)
self.assertTrue(not fileobject.closed)
response.close()
......@@ -157,15 +155,15 @@ class OtherNetworkTests(unittest.TestCase):
## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
def test_urlwithfrag(self):
urlwith_frag = "http://docs.python.org/glossary.html#glossary"
urlwith_frag = "https://docs.python.org/2/glossary.html#glossary"
with test_support.transient_internet(urlwith_frag):
req = urllib2.Request(urlwith_frag)
res = urllib2.urlopen(req)
self.assertEqual(res.geturl(),
"http://docs.python.org/glossary.html#glossary")
"https://docs.python.org/2/glossary.html#glossary")
def test_fileno(self):
req = urllib2.Request("http://www.python.org")
req = urllib2.Request("http://www.example.com")
opener = urllib2.build_opener()
res = opener.open(req)
try:
......@@ -252,15 +250,15 @@ class OtherNetworkTests(unittest.TestCase):
class TimeoutTest(unittest.TestCase):
def test_http_basic(self):
self.assertTrue(socket.getdefaulttimeout() is None)
url = "http://www.python.org"
self.assertIsNone(socket.getdefaulttimeout())
url = "http://www.example.com"
with test_support.transient_internet(url, timeout=None):
u = _urlopen_with_retry(url)
self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
def test_http_default_timeout(self):
self.assertTrue(socket.getdefaulttimeout() is None)
url = "http://www.python.org"
self.assertIsNone(socket.getdefaulttimeout())
url = "http://www.example.com"
with test_support.transient_internet(url):
socket.setdefaulttimeout(60)
try:
......@@ -270,18 +268,18 @@ class TimeoutTest(unittest.TestCase):
self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
def test_http_no_timeout(self):
self.assertTrue(socket.getdefaulttimeout() is None)
url = "http://www.python.org"
self.assertIsNone(socket.getdefaulttimeout())
url = "http://www.example.com"
with test_support.transient_internet(url):
socket.setdefaulttimeout(60)
try:
u = _urlopen_with_retry(url, timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertTrue(u.fp._sock.fp._sock.gettimeout() is None)
self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
def test_http_timeout(self):
url = "http://www.python.org"
url = "http://www.example.com"
with test_support.transient_internet(url):
u = _urlopen_with_retry(url, timeout=120)
self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
......@@ -289,13 +287,13 @@ class TimeoutTest(unittest.TestCase):
FTP_HOST = "ftp://ftp.mirror.nl/pub/gnu/"
def test_ftp_basic(self):
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
with test_support.transient_internet(self.FTP_HOST, timeout=None):
u = _urlopen_with_retry(self.FTP_HOST)
self.assertTrue(u.fp.fp._sock.gettimeout() is None)
self.assertIsNone(u.fp.fp._sock.gettimeout())
def test_ftp_default_timeout(self):
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout())
with test_support.transient_internet(self.FTP_HOST):
socket.setdefaulttimeout(60)
try:
......@@ -305,14 +303,14 @@ class TimeoutTest(unittest.TestCase):
self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
def test_ftp_no_timeout(self):
self.assertTrue(socket.getdefaulttimeout() is None)
self.assertIsNone(socket.getdefaulttimeout(),)
with test_support.transient_internet(self.FTP_HOST):
socket.setdefaulttimeout(60)
try:
u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertTrue(u.fp.fp._sock.gettimeout() is None)
self.assertIsNone(u.fp.fp._sock.gettimeout())
def test_ftp_timeout(self):
with test_support.transient_internet(self.FTP_HOST):
......
......@@ -39,9 +39,6 @@ class MockHandler(WSGIRequestHandler):
pass
def hello_app(environ,start_response):
start_response("200 OK", [
('Content-Type','text/plain'),
......@@ -62,27 +59,6 @@ def run_amock(app=hello_app, data="GET / HTTP/1.0\n\n"):
return out.getvalue(), err.getvalue()
def compare_generic_iter(make_it,match):
"""Utility to compare a generic 2.1/2.2+ iterator with an iterable
......@@ -120,10 +96,6 @@ def compare_generic_iter(make_it,match):
raise AssertionError("Too many items from .next()",it)
class IntegrationTests(TestCase):
def check_hello(self, out, has_length=True):
......@@ -161,10 +133,6 @@ class IntegrationTests(TestCase):
)
class UtilityTests(TestCase):
def checkShift(self,sn_in,pi_in,part,sn_out,pi_out):
......@@ -187,7 +155,7 @@ class UtilityTests(TestCase):
# Check existing value
env = {key:alt}
util.setup_testing_defaults(env)
self.assertTrue(env[key] is alt)
self.assertIs(env[key], alt)
def checkCrossDefault(self,key,value,**kw):
util.setup_testing_defaults(kw)
......@@ -201,11 +169,6 @@ class UtilityTests(TestCase):
util.setup_testing_defaults(kw)
self.assertEqual(util.request_uri(kw,query),uri)
def checkFW(self,text,size,match):
def make_it(text=text,size=size):
......@@ -224,7 +187,6 @@ class UtilityTests(TestCase):
it.close()
self.assertTrue(it.filelike.closed)
def testSimpleShifts(self):
self.checkShift('','/', '', '/', '')
self.checkShift('','/x', 'x', '/x', '')
......@@ -232,7 +194,6 @@ class UtilityTests(TestCase):
self.checkShift('/a','/x/y', 'x', '/a/x', '/y')
self.checkShift('/a','/x/', 'x', '/a/x', '/')
def testNormalizedShifts(self):
self.checkShift('/a/b', '/../y', '..', '/a', '/y')
self.checkShift('', '/../y', '..', '', '/y')
......@@ -246,7 +207,6 @@ class UtilityTests(TestCase):
self.checkShift('/a/b', '/x//', 'x', '/a/b/x', '/')
self.checkShift('/a/b', '/.', None, '/a/b', '')
def testDefaults(self):
for key, value in [
('SERVER_NAME','127.0.0.1'),
......@@ -266,7 +226,6 @@ class UtilityTests(TestCase):
]:
self.checkDefault(key,value)
def testCrossDefaults(self):
self.checkCrossDefault('HTTP_HOST',"foo.bar",SERVER_NAME="foo.bar")
self.checkCrossDefault('wsgi.url_scheme',"https",HTTPS="on")
......@@ -276,7 +235,6 @@ class UtilityTests(TestCase):
self.checkCrossDefault('SERVER_PORT',"80",HTTPS="foo")
self.checkCrossDefault('SERVER_PORT',"443",HTTPS="on")
def testGuessScheme(self):
self.assertEqual(util.guess_scheme({}), "http")
self.assertEqual(util.guess_scheme({'HTTPS':"foo"}), "http")
......@@ -284,13 +242,10 @@ class UtilityTests(TestCase):
self.assertEqual(util.guess_scheme({'HTTPS':"yes"}), "https")
self.assertEqual(util.guess_scheme({'HTTPS':"1"}), "https")
def testAppURIs(self):
self.checkAppURI("http://127.0.0.1/")
self.checkAppURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
self.checkAppURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
self.checkAppURI("http://spam.example.com:2071/",
HTTP_HOST="spam.example.com:2071", SERVER_PORT="2071")
self.checkAppURI("http://spam.example.com/",
......@@ -304,14 +259,19 @@ class UtilityTests(TestCase):
def testReqURIs(self):
self.checkReqURI("http://127.0.0.1/")
self.checkReqURI("http://127.0.0.1/spam", SCRIPT_NAME="/spam")
self.checkReqURI("http://127.0.0.1/sp%E4m", SCRIPT_NAME="/sp\xe4m")
self.checkReqURI("http://127.0.0.1/spammity/spam",
SCRIPT_NAME="/spammity", PATH_INFO="/spam")
self.checkReqURI("http://127.0.0.1/spammity/sp%E4m",
SCRIPT_NAME="/spammity", PATH_INFO="/sp\xe4m")
self.checkReqURI("http://127.0.0.1/spammity/spam;ham",
SCRIPT_NAME="/spammity", PATH_INFO="/spam;ham")
self.checkReqURI("http://127.0.0.1/spammity/spam;cookie=1234,5678",
SCRIPT_NAME="/spammity", PATH_INFO="/spam;cookie=1234,5678")
self.checkReqURI("http://127.0.0.1/spammity/spam?say=ni",
SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
self.checkReqURI("http://127.0.0.1/spammity/spam?s%E4y=ni",
SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="s%E4y=ni")
self.checkReqURI("http://127.0.0.1/spammity/spam", 0,
SCRIPT_NAME="/spammity", PATH_INFO="/spam",QUERY_STRING="say=ni")
......@@ -342,7 +302,7 @@ class HeaderTests(TestCase):
self.assertEqual(Headers(test[:]).keys(), ['x'])
self.assertEqual(Headers(test[:]).values(), ['y'])
self.assertEqual(Headers(test[:]).items(), test)
self.assertFalse(Headers(test).items() is test) # must be copy!
self.assertIsNot(Headers(test).items(), test) # must be copy!
h=Headers([])
del h['foo'] # should not raise an error
......@@ -411,15 +371,6 @@ class TestHandler(ErrorHandler):
raise # for testing, we want to see what's happening
class HandlerTests(TestCase):
def checkEnvironAttrs(self, handler):
......@@ -460,7 +411,6 @@ class HandlerTests(TestCase):
h=TestHandler(); h.setup_environ()
self.assertEqual(h.environ['wsgi.url_scheme'],'http')
def testAbstractMethods(self):
h = BaseHandler()
for name in [
......@@ -469,7 +419,6 @@ class HandlerTests(TestCase):
self.assertRaises(NotImplementedError, getattr(h,name))
self.assertRaises(NotImplementedError, h._write, "test")
def testContentLength(self):
# Demo one reason iteration is better than write()... ;)
......@@ -549,7 +498,6 @@ class HandlerTests(TestCase):
"\r\n"+MSG)
self.assertNotEqual(h.stderr.getvalue().find("AssertionError"), -1)
def testHeaderFormats(self):
def non_error_app(e,s):
......@@ -591,40 +539,28 @@ class HandlerTests(TestCase):
(stdpat%(version,sw), h.stdout.getvalue())
)
# This epilogue is needed for compatibility with the Python 2.5 regrtest module
def testCloseOnError(self):
side_effects = {'close_called': False}
MSG = b"Some output has been sent"
def error_app(e,s):
s("200 OK",[])(MSG)
class CrashyIterable(object):
def __iter__(self):
while True:
yield b'blah'
raise AssertionError("This should be caught by handler")
def close(self):
side_effects['close_called'] = True
return CrashyIterable()
h = ErrorHandler()
h.run(error_app)
self.assertEqual(side_effects['close_called'], True)
def test_main():
test_support.run_unittest(__name__)
if __name__ == "__main__":
test_main()
# the above lines intentionally left blank
......@@ -83,7 +83,6 @@ if PYPY:
'test__server.py',
'test_subprocess.py', # test_executable_without_cwd
'FLAKY test___example_servers.py',
'test_httpservers.py',
'FLAKY test_queue.py',
]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment