Commit 1a0f3939 authored by Denis Bilenko's avatar Denis Bilenko

test__pywsgi.py: do not use urllib2 as that makes totalrefcount check fail

--HG--
extra : transplant_source : %BF1%93%5C%1Ck%7F%9DA%E8%09%CD%2A9%EB%D8%5C%7B8%B1
parent 6c9d2dec
# @author Donovan Preston
#
# Copyright (c) 2007, Linden Research, Inc.
# Copyright (c) 2009-2010 gevent contributors
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
......@@ -50,6 +49,9 @@ try:
except ImportError:
pass
REASONS = {200: 'OK',
500: 'Internal Server Error'}
class ConnectionClosed(Exception):
pass
......@@ -147,7 +149,7 @@ class Response(object):
if code is not None:
self.assertCode(code)
if reason == 'default':
reason = {200: 'OK'}.get(code)
reason = REASONS.get(code)
if reason is not None:
self.assertReason(reason)
if version is not None:
......@@ -235,11 +237,19 @@ class TestCase(greentest.TestCase):
def connect(self):
return socket.create_connection(('127.0.0.1', self.port))
def makefile(self):
return self.connect().makefile(bufsize=1)
def urlopen(self, *args, **kwargs):
fd = self.connect().makefile(bufsize=1)
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
return read_http(fd, *args, **kwargs)
class CommonTests(TestCase):
def test_basic(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd, body='hello world')
if response.headers.get('Connection') == 'close' and not server_implements_pipeline:
......@@ -253,7 +263,7 @@ class CommonTests(TestCase):
def test_pipeline(self):
if not server_implements_pipeline:
return
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n' + 'GET /notexist HTTP/1.1\r\nHost: localhost\r\n\r\n')
read_http(fd, body='hello world')
exception = AssertionError('HTTP pipelining not supported; the second request is thrown away')
......@@ -269,7 +279,7 @@ class CommonTests(TestCase):
raise
def test_connection_close(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd)
if response.headers.get('Connection') == 'close' and not server_implements_pipeline:
......@@ -285,7 +295,7 @@ class CommonTests(TestCase):
raise
def SKIP_test_006_reject_long_urls(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
path_parts = []
for ii in range(3000):
path_parts.append('path')
......@@ -314,14 +324,14 @@ class TestNoChunks(CommonTests):
return ['not ', 'found']
def test(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd, body='hello world')
assert response.chunks is None, response.chunks
response.assertHeader('Content-Length', '11')
if not server_implements_pipeline:
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET /not-found HTTP/1.1\r\nHost: localhost\r\n\r\n')
response = read_http(fd, code=404, reason='Not Found', body='not found')
......@@ -367,7 +377,7 @@ class TestGetArg(TestCase):
def test_007_get_arg(self):
# define a new handler that does a get_arg as well as a read_body
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
request = '\r\n'.join((
'POST / HTTP/1.0',
'Host: localhost',
......@@ -395,7 +405,7 @@ class TestChunkedApp(TestCase):
yield chunk
def test_chunked_response(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body())
if server_implements_chunked:
......@@ -407,7 +417,7 @@ class TestChunkedApp(TestCase):
self.assertEqual(response.chunks, None)
def test_no_chunked_http_1_0(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET / HTTP/1.0\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, version='1.0')
self.assertEqual(response.body, self.body())
......@@ -435,19 +445,19 @@ class TestChunkedPost(TestCase):
return [x for x in iter(lambda: env['wsgi.input'].read(1), '')]
def test_014_chunked_post(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('POST /a HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
read_http(fd, body='oh hai')
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('POST /b HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
read_http(fd, body='oh hai')
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('POST /c HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Transfer-Encoding: chunked\r\n\r\n'
'2\r\noh\r\n4\r\n hai\r\n0\r\n\r\n')
......@@ -477,14 +487,14 @@ class TestUseWrite(TestCase):
return [self.end]
def test_explicit_content_length(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET /explicit-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.end)
response.assertHeader('Content-Length', self.content_length)
response.assertHeader('Transfer-Encoding', None)
def test_no_content_length(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET /no-content-length HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.end)
if server_implements_chunked:
......@@ -494,7 +504,7 @@ class TestUseWrite(TestCase):
response.assertHeader('Content-Length', self.content_length)
def test_no_content_length_twice(self):
fd = self.connect().makefile(bufsize=1)
fd = self.makefile()
fd.write('GET /no-content-length-twice HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n')
response = read_http(fd, body=self.body + self.body + self.end)
if server_implements_chunked:
......@@ -513,9 +523,20 @@ class HttpsTestCase(TestCase):
def init_server(self, application):
self.server = self.get_wsgi_module().WSGIServer(('127.0.0.1', 0), application, certfile=self.certfile, keyfile=self.keyfile)
def urlopen(self, *args, **kwargs):
req = HTTPRequest("https://localhost:%s/foo" % self.server.server_port, *args, **kwargs)
return urllib2.urlopen(req)
def urlopen(self, method='GET', post_body=None, **kwargs):
import ssl
sock = self.connect()
sock = ssl.wrap_socket(sock)
fd = sock.makefile(bufsize=1)
fd.write('%s / HTTP/1.1\r\nHost: localhost\r\n' % method)
if post_body is not None:
fd.write('Content-Length: %s\r\n\r\n' % len(post_body))
fd.write(post_body)
if kwargs.get('body') is None:
kwargs['body'] = post_body
else:
fd.write('\r\n')
return read_http(fd, **kwargs)
def application(self, environ, start_response):
assert environ['wsgi.url_scheme'] == 'https', environ['wsgi.url_scheme']
......@@ -528,12 +549,12 @@ class TestHttps(HttpsTestCase):
if hasattr(socket, 'ssl'):
def test_012_ssl_server(self):
result = self.urlopen(method="POST", data='abc').read()
self.assertEquals(result, 'abc')
result = self.urlopen(method="POST", post_body='abc')
self.assertEquals(result.body, 'abc')
def test_013_empty_return(self):
result = self.urlopen().read()
self.assertEquals(result, '')
result = self.urlopen()
self.assertEquals(result.body, '')
class TestInternational(TestCase):
......@@ -569,7 +590,7 @@ class TestInputReadline(TestCase):
return lines
def test(self):
fd = self.connect().makefile()
fd = self.makefile()
content = 'hello\n\nworld\n123'
fd.write('POST / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n'
'Content-Length: %s\r\n\r\n%s' % (len(content), content))
......@@ -606,17 +627,8 @@ class TestError(TestCase):
def application(env, start_response):
raise greentest.ExpectedException('TestError.application')
@property
def url(self):
return 'http://127.0.0.1:%s' % self.port
def test(self):
try:
r = urllib2.urlopen(self.url)
raise AssertionError('Must raise HTTPError, returned %r: %s' % (r, r.code))
except urllib2.HTTPError, ex:
assert ex.code == 500, ex
assert ex.msg == 'Internal Server Error', ex
self.urlopen(code=500)
class TestError_after_start_response(TestError):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment