Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gevent
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
gevent
Commits
24fc481b
Commit
24fc481b
authored
Aug 31, 2016
by
Jason Madden
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Pull in current versions of test_httplib.py and test_ssl.py for 3.4. Fixes #844.
parent
cd77375f
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
315 additions
and
38 deletions
+315
-38
src/greentest/3.4/test_httplib.py
src/greentest/3.4/test_httplib.py
+312
-34
src/greentest/3.4/test_ssl.py
src/greentest/3.4/test_ssl.py
+3
-4
No files found.
src/greentest/3.4/test_httplib.py
View file @
24fc481b
import
errno
from
http
import
client
import
io
import
itertools
import
os
import
array
import
socket
...
...
@@ -21,20 +22,34 @@ CERT_selfsigned_pythontestdotnet = os.path.join(here, 'selfsigned_pythontestdotn
HOST
=
support
.
HOST
class
FakeSocket
:
def
__init__
(
self
,
text
,
fileclass
=
io
.
BytesIO
):
def
__init__
(
self
,
text
,
fileclass
=
io
.
BytesIO
,
host
=
None
,
port
=
None
):
if
isinstance
(
text
,
str
):
text
=
text
.
encode
(
"ascii"
)
self
.
text
=
text
self
.
fileclass
=
fileclass
self
.
data
=
b''
self
.
sendall_calls
=
0
self
.
file_closed
=
False
self
.
host
=
host
self
.
port
=
port
def
sendall
(
self
,
data
):
self
.
sendall_calls
+=
1
self
.
data
+=
data
def
makefile
(
self
,
mode
,
bufsize
=
None
):
if
mode
!=
'r'
and
mode
!=
'rb'
:
raise
client
.
UnimplementedFileMode
()
return
self
.
fileclass
(
self
.
text
)
# keep the file around so we can check how much was read from it
self
.
file
=
self
.
fileclass
(
self
.
text
)
self
.
file
.
close
=
self
.
file_close
#nerf close ()
return
self
.
file
def
file_close
(
self
):
self
.
file_closed
=
True
def
close
(
self
):
pass
class
EPipeSocket
(
FakeSocket
):
...
...
@@ -45,7 +60,7 @@ class EPipeSocket(FakeSocket):
def
sendall
(
self
,
data
):
if
self
.
pipe_trigger
in
data
:
raise
socket
.
e
rror
(
errno
.
EPIPE
,
"gotcha"
)
raise
OSE
rror
(
errno
.
EPIPE
,
"gotcha"
)
self
.
data
+=
data
def
close
(
self
):
...
...
@@ -111,21 +126,59 @@ class HeaderTests(TestCase):
self
.
content_length
=
kv
[
1
].
strip
()
list
.
append
(
self
,
item
)
# POST with empty body
conn
=
client
.
HTTPConnection
(
'example.com'
)
conn
.
sock
=
FakeSocket
(
None
)
conn
.
_buffer
=
ContentLengthChecker
()
conn
.
request
(
'POST'
,
'/'
,
''
)
self
.
assertEqual
(
conn
.
_buffer
.
content_length
,
b'0'
,
'Header Content-Length not set'
)
# PUT request with empty body
conn
=
client
.
HTTPConnection
(
'example.com'
)
conn
.
sock
=
FakeSocket
(
None
)
conn
.
_buffer
=
ContentLengthChecker
()
conn
.
request
(
'PUT'
,
'/'
,
''
)
self
.
assertEqual
(
conn
.
_buffer
.
content_length
,
b'0'
,
'Header Content-Length not set'
)
# Here, we're testing that methods expecting a body get a
# content-length set to zero if the body is empty (either None or '')
bodies
=
(
None
,
''
)
methods_with_body
=
(
'PUT'
,
'POST'
,
'PATCH'
)
for
method
,
body
in
itertools
.
product
(
methods_with_body
,
bodies
):
conn
=
client
.
HTTPConnection
(
'example.com'
)
conn
.
sock
=
FakeSocket
(
None
)
conn
.
_buffer
=
ContentLengthChecker
()
conn
.
request
(
method
,
'/'
,
body
)
self
.
assertEqual
(
conn
.
_buffer
.
content_length
,
b'0'
,
'Header Content-Length incorrect on {}'
.
format
(
method
)
)
# For these methods, we make sure that content-length is not set when
# the body is None because it might cause unexpected behaviour on the
# server.
methods_without_body
=
(
'GET'
,
'CONNECT'
,
'DELETE'
,
'HEAD'
,
'OPTIONS'
,
'TRACE'
,
)
for
method
in
methods_without_body
:
conn
=
client
.
HTTPConnection
(
'example.com'
)
conn
.
sock
=
FakeSocket
(
None
)
conn
.
_buffer
=
ContentLengthChecker
()
conn
.
request
(
method
,
'/'
,
None
)
self
.
assertEqual
(
conn
.
_buffer
.
content_length
,
None
,
'Header Content-Length set for empty body on {}'
.
format
(
method
)
)
# If the body is set to '', that's considered to be "present but
# empty" rather than "missing", so content length would be set, even
# for methods that don't expect a body.
for
method
in
methods_without_body
:
conn
=
client
.
HTTPConnection
(
'example.com'
)
conn
.
sock
=
FakeSocket
(
None
)
conn
.
_buffer
=
ContentLengthChecker
()
conn
.
request
(
method
,
'/'
,
''
)
self
.
assertEqual
(
conn
.
_buffer
.
content_length
,
b'0'
,
'Header Content-Length incorrect on {}'
.
format
(
method
)
)
# If the body is set, make sure Content-Length is set.
for
method
in
itertools
.
chain
(
methods_without_body
,
methods_with_body
):
conn
=
client
.
HTTPConnection
(
'example.com'
)
conn
.
sock
=
FakeSocket
(
None
)
conn
.
_buffer
=
ContentLengthChecker
()
conn
.
request
(
method
,
'/'
,
' '
)
self
.
assertEqual
(
conn
.
_buffer
.
content_length
,
b'1'
,
'Header Content-Length incorrect on {}'
.
format
(
method
)
)
def
test_putheader
(
self
):
conn
=
client
.
HTTPConnection
(
'example.com'
)
...
...
@@ -134,6 +187,33 @@ class HeaderTests(TestCase):
conn
.
putheader
(
'Content-length'
,
42
)
self
.
assertIn
(
b'Content-length: 42'
,
conn
.
_buffer
)
conn
.
putheader
(
'Foo'
,
' bar '
)
self
.
assertIn
(
b'Foo: bar '
,
conn
.
_buffer
)
conn
.
putheader
(
'Bar'
,
'
\
t
baz
\
t
'
)
self
.
assertIn
(
b'Bar:
\
t
baz
\
t
'
,
conn
.
_buffer
)
conn
.
putheader
(
'Authorization'
,
'Bearer mytoken'
)
self
.
assertIn
(
b'Authorization: Bearer mytoken'
,
conn
.
_buffer
)
conn
.
putheader
(
'IterHeader'
,
'IterA'
,
'IterB'
)
self
.
assertIn
(
b'IterHeader: IterA
\
r
\
n
\
t
IterB'
,
conn
.
_buffer
)
conn
.
putheader
(
'LatinHeader'
,
b'
\
xFF
'
)
self
.
assertIn
(
b'LatinHeader:
\
xFF
'
,
conn
.
_buffer
)
conn
.
putheader
(
'Utf8Header'
,
b'
\
xc3
\
x80
'
)
self
.
assertIn
(
b'Utf8Header:
\
xc3
\
x80
'
,
conn
.
_buffer
)
conn
.
putheader
(
'C1-Control'
,
b'next
\
x85
line'
)
self
.
assertIn
(
b'C1-Control: next
\
x85
line'
,
conn
.
_buffer
)
conn
.
putheader
(
'Embedded-Fold-Space'
,
'is
\
r
\
n
allowed'
)
self
.
assertIn
(
b'Embedded-Fold-Space: is
\
r
\
n
allowed'
,
conn
.
_buffer
)
conn
.
putheader
(
'Embedded-Fold-Tab'
,
'is
\
r
\
n
\
t
allowed'
)
self
.
assertIn
(
b'Embedded-Fold-Tab: is
\
r
\
n
\
t
allowed'
,
conn
.
_buffer
)
conn
.
putheader
(
'Key Space'
,
'value'
)
self
.
assertIn
(
b'Key Space: value'
,
conn
.
_buffer
)
conn
.
putheader
(
'KeySpace '
,
'value'
)
self
.
assertIn
(
b'KeySpace : value'
,
conn
.
_buffer
)
conn
.
putheader
(
b'Nonbreak
\
xa0
Space'
,
'value'
)
self
.
assertIn
(
b'Nonbreak
\
xa0
Space: value'
,
conn
.
_buffer
)
conn
.
putheader
(
b'
\
xa0
NonbreakSpace'
,
'value'
)
self
.
assertIn
(
b'
\
xa0
NonbreakSpace: value'
,
conn
.
_buffer
)
def
test_ipv6host_header
(
self
):
# Default host header on IPv6 transaction should wrapped by [] if
# its actual IPv6 address
...
...
@@ -153,6 +233,46 @@ class HeaderTests(TestCase):
conn
.
request
(
'GET'
,
'/foo'
)
self
.
assertTrue
(
sock
.
data
.
startswith
(
expected
))
def
test_malformed_headers_coped_with
(
self
):
# Issue 19996
body
=
"HTTP/1.1 200 OK
\
r
\
n
First: val
\
r
\
n
: nval
\
r
\
n
Second: val
\
r
\
n
\
r
\
n
"
sock
=
FakeSocket
(
body
)
resp
=
client
.
HTTPResponse
(
sock
)
resp
.
begin
()
self
.
assertEqual
(
resp
.
getheader
(
'First'
),
'val'
)
self
.
assertEqual
(
resp
.
getheader
(
'Second'
),
'val'
)
def
test_invalid_headers
(
self
):
conn
=
client
.
HTTPConnection
(
'example.com'
)
conn
.
sock
=
FakeSocket
(
''
)
conn
.
putrequest
(
'GET'
,
'/'
)
# http://tools.ietf.org/html/rfc7230#section-3.2.4, whitespace is no
# longer allowed in header names
cases
=
(
(
b'Invalid
\
r
\
n
Name'
,
b'ValidValue'
),
(
b'Invalid
\
r
Name'
,
b'ValidValue'
),
(
b'Invalid
\
n
Name'
,
b'ValidValue'
),
(
b'
\
r
\
n
InvalidName'
,
b'ValidValue'
),
(
b'
\
r
InvalidName'
,
b'ValidValue'
),
(
b'
\
n
InvalidName'
,
b'ValidValue'
),
(
b' InvalidName'
,
b'ValidValue'
),
(
b'
\
t
InvalidName'
,
b'ValidValue'
),
(
b'Invalid:Name'
,
b'ValidValue'
),
(
b':InvalidName'
,
b'ValidValue'
),
(
b'ValidName'
,
b'Invalid
\
r
\
n
Value'
),
(
b'ValidName'
,
b'Invalid
\
r
Value'
),
(
b'ValidName'
,
b'Invalid
\
n
Value'
),
(
b'ValidName'
,
b'InvalidValue
\
r
\
n
'
),
(
b'ValidName'
,
b'InvalidValue
\
r
'
),
(
b'ValidName'
,
b'InvalidValue
\
n
'
),
)
for
name
,
value
in
cases
:
with
self
.
subTest
((
name
,
value
)):
with
self
.
assertRaisesRegex
(
ValueError
,
'Invalid header'
):
conn
.
putheader
(
name
,
value
)
class
BasicTest
(
TestCase
):
def
test_status_lines
(
self
):
...
...
@@ -600,7 +720,7 @@ class BasicTest(TestCase):
b"Content-Length"
)
conn
=
client
.
HTTPConnection
(
"example.com"
)
conn
.
sock
=
sock
self
.
assertRaises
(
socket
.
e
rror
,
self
.
assertRaises
(
OSE
rror
,
lambda
:
conn
.
request
(
"PUT"
,
"/url"
,
"body"
))
resp
=
conn
.
getresponse
()
self
.
assertEqual
(
401
,
resp
.
status
)
...
...
@@ -646,7 +766,60 @@ class BasicTest(TestCase):
resp
.
close
()
self
.
assertTrue
(
resp
.
closed
)
def
test_delayed_ack_opt
(
self
):
# Test that Nagle/delayed_ack optimistaion works correctly.
# For small payloads, it should coalesce the body with
# headers, resulting in a single sendall() call
conn
=
client
.
HTTPConnection
(
'example.com'
)
sock
=
FakeSocket
(
None
)
conn
.
sock
=
sock
body
=
b'x'
*
(
conn
.
mss
-
1
)
conn
.
request
(
'POST'
,
'/'
,
body
)
self
.
assertEqual
(
sock
.
sendall_calls
,
1
)
# For large payloads, it should send the headers and
# then the body, resulting in more than one sendall()
# call
conn
=
client
.
HTTPConnection
(
'example.com'
)
sock
=
FakeSocket
(
None
)
conn
.
sock
=
sock
body
=
b'x'
*
conn
.
mss
conn
.
request
(
'POST'
,
'/'
,
body
)
self
.
assertGreater
(
sock
.
sendall_calls
,
1
)
def
test_error_leak
(
self
):
# Test that the socket is not leaked if getresponse() fails
conn
=
client
.
HTTPConnection
(
'example.com'
)
response
=
None
class
Response
(
client
.
HTTPResponse
):
def
__init__
(
self
,
*
pos
,
**
kw
):
nonlocal
response
response
=
self
# Avoid garbage collector closing the socket
client
.
HTTPResponse
.
__init__
(
self
,
*
pos
,
**
kw
)
conn
.
response_class
=
Response
conn
.
sock
=
FakeSocket
(
''
)
# Emulate server dropping connection
conn
.
request
(
'GET'
,
'/'
)
self
.
assertRaises
(
client
.
BadStatusLine
,
conn
.
getresponse
)
self
.
assertTrue
(
response
.
closed
)
self
.
assertTrue
(
conn
.
sock
.
file_closed
)
class
OfflineTest
(
TestCase
):
def
test_all
(
self
):
# Documented objects defined in the module should be in __all__
expected
=
{
"responses"
}
# White-list documented dict() object
# HTTPMessage, parse_headers(), and the HTTP status code constants are
# intentionally omitted for simplicity
blacklist
=
{
"HTTPMessage"
,
"parse_headers"
}
for
name
in
dir
(
client
):
if
name
in
blacklist
:
continue
module_object
=
getattr
(
client
,
name
)
if
getattr
(
module_object
,
"__module__"
,
None
)
==
"http.client"
:
expected
.
add
(
name
)
self
.
assertCountEqual
(
client
.
__all__
,
expected
)
def
test_responses
(
self
):
self
.
assertEqual
(
client
.
responses
[
client
.
NOT_FOUND
],
"Not Found"
)
...
...
@@ -743,19 +916,40 @@ class HTTPSTest(TestCase):
h
=
client
.
HTTPSConnection
(
HOST
,
TimeoutTest
.
PORT
,
timeout
=
30
)
self
.
assertEqual
(
h
.
timeout
,
30
)
def
_check_svn_python_org
(
self
,
resp
):
# Just a simple check that everything went fine
server_string
=
resp
.
getheader
(
'server'
)
self
.
assertIn
(
'Apache'
,
server_string
)
def
test_networked
(
self
):
# Default settings: no cert verification is done
# Default settings: requires a valid cert from a trusted CA
import
ssl
support
.
requires
(
'network'
)
with
support
.
transient_internet
(
'self-signed.pythontest.net'
):
h
=
client
.
HTTPSConnection
(
'self-signed.pythontest.net'
,
443
)
with
self
.
assertRaises
(
ssl
.
SSLError
)
as
exc_info
:
h
.
request
(
'GET'
,
'/'
)
self
.
assertEqual
(
exc_info
.
exception
.
reason
,
'CERTIFICATE_VERIFY_FAILED'
)
def
test_networked_noverification
(
self
):
# Switch off cert verification
import
ssl
support
.
requires
(
'network'
)
with
support
.
transient_internet
(
'svn.python.org'
):
h
=
client
.
HTTPSConnection
(
'svn.python.org'
,
443
)
with
support
.
transient_internet
(
'self-signed.pythontest.net'
):
context
=
ssl
.
_create_unverified_context
()
h
=
client
.
HTTPSConnection
(
'self-signed.pythontest.net'
,
443
,
context
=
context
)
h
.
request
(
'GET'
,
'/'
)
resp
=
h
.
getresponse
()
self
.
_check_svn_python_org
(
resp
)
h
.
close
()
self
.
assertIn
(
'nginx'
,
resp
.
getheader
(
'server'
))
@
support
.
system_must_validate_cert
def
test_networked_trusted_by_default_cert
(
self
):
# Default settings: requires a valid cert from a trusted CA
support
.
requires
(
'network'
)
with
support
.
transient_internet
(
'www.python.org'
):
h
=
client
.
HTTPSConnection
(
'www.python.org'
,
443
)
h
.
request
(
'GET'
,
'/'
)
resp
=
h
.
getresponse
()
content_type
=
resp
.
getheader
(
'content-type'
)
h
.
close
()
self
.
assertIn
(
'text/html'
,
content_type
)
def
test_networked_good_cert
(
self
):
# We feed the server's cert as a validating cert
...
...
@@ -769,19 +963,30 @@ class HTTPSTest(TestCase):
h
.
request
(
'GET'
,
'/'
)
resp
=
h
.
getresponse
()
server_string
=
resp
.
getheader
(
'server'
)
h
.
close
()
self
.
assertIn
(
'nginx'
,
server_string
)
def
test_networked_bad_cert
(
self
):
# We feed a "CA" cert that is unrelated to the server's cert
import
ssl
support
.
requires
(
'network'
)
with
support
.
transient_internet
(
's
vn.python.org
'
):
with
support
.
transient_internet
(
's
elf-signed.pythontest.net
'
):
context
=
ssl
.
SSLContext
(
ssl
.
PROTOCOL_TLSv1
)
context
.
verify_mode
=
ssl
.
CERT_REQUIRED
context
.
load_verify_locations
(
CERT_localhost
)
h
=
client
.
HTTPSConnection
(
's
vn.python.org
'
,
443
,
context
=
context
)
with
self
.
assertRaises
(
ssl
.
SSLError
):
h
=
client
.
HTTPSConnection
(
's
elf-signed.pythontest.net
'
,
443
,
context
=
context
)
with
self
.
assertRaises
(
ssl
.
SSLError
)
as
exc_info
:
h
.
request
(
'GET'
,
'/'
)
self
.
assertEqual
(
exc_info
.
exception
.
reason
,
'CERTIFICATE_VERIFY_FAILED'
)
def
test_local_unknown_cert
(
self
):
# The custom cert isn't known to the default trust bundle
import
ssl
server
=
self
.
make_server
(
CERT_localhost
)
h
=
client
.
HTTPSConnection
(
'localhost'
,
server
.
port
)
with
self
.
assertRaises
(
ssl
.
SSLError
)
as
exc_info
:
h
.
request
(
'GET'
,
'/'
)
self
.
assertEqual
(
exc_info
.
exception
.
reason
,
'CERTIFICATE_VERIFY_FAILED'
)
def
test_local_good_hostname
(
self
):
# The (valid) cert validates the HTTP hostname
...
...
@@ -794,7 +999,6 @@ class HTTPSTest(TestCase):
h
.
request
(
'GET'
,
'/nonexistent'
)
resp
=
h
.
getresponse
()
self
.
assertEqual
(
resp
.
status
,
404
)
del
server
def
test_local_bad_hostname
(
self
):
# The (valid) cert doesn't validate the HTTP hostname
...
...
@@ -802,6 +1006,7 @@ class HTTPSTest(TestCase):
server
=
self
.
make_server
(
CERT_fakehostname
)
context
=
ssl
.
SSLContext
(
ssl
.
PROTOCOL_TLSv1
)
context
.
verify_mode
=
ssl
.
CERT_REQUIRED
context
.
check_hostname
=
True
context
.
load_verify_locations
(
CERT_fakehostname
)
h
=
client
.
HTTPSConnection
(
'localhost'
,
server
.
port
,
context
=
context
)
with
self
.
assertRaises
(
ssl
.
CertificateError
):
...
...
@@ -812,12 +1017,24 @@ class HTTPSTest(TestCase):
with
self
.
assertRaises
(
ssl
.
CertificateError
):
h
.
request
(
'GET'
,
'/'
)
# With check_hostname=False, the mismatching is ignored
context
.
check_hostname
=
False
h
=
client
.
HTTPSConnection
(
'localhost'
,
server
.
port
,
context
=
context
,
check_hostname
=
False
)
h
.
request
(
'GET'
,
'/nonexistent'
)
resp
=
h
.
getresponse
()
self
.
assertEqual
(
resp
.
status
,
404
)
del
server
# The context's check_hostname setting is used if one isn't passed to
# HTTPSConnection.
context
.
check_hostname
=
False
h
=
client
.
HTTPSConnection
(
'localhost'
,
server
.
port
,
context
=
context
)
h
.
request
(
'GET'
,
'/nonexistent'
)
self
.
assertEqual
(
h
.
getresponse
().
status
,
404
)
# Passing check_hostname to HTTPSConnection should override the
# context's setting.
h
=
client
.
HTTPSConnection
(
'localhost'
,
server
.
port
,
context
=
context
,
check_hostname
=
True
)
with
self
.
assertRaises
(
ssl
.
CertificateError
):
h
.
request
(
'GET'
,
'/'
)
@
unittest
.
skipIf
(
not
hasattr
(
client
,
'HTTPSConnection'
),
'http.client.HTTPSConnection not available'
)
...
...
@@ -947,10 +1164,71 @@ class HTTPResponseTest(TestCase):
header
=
self
.
resp
.
getheader
(
'No-Such-Header'
,
default
=
42
)
self
.
assertEqual
(
header
,
42
)
class
TunnelTests
(
TestCase
):
def
setUp
(
self
):
response_text
=
(
'HTTP/1.0 200 OK
\
r
\
n
\
r
\
n
'
# Reply to CONNECT
'HTTP/1.1 200 OK
\
r
\
n
'
# Reply to HEAD
'Content-Length: 42
\
r
\
n
\
r
\
n
'
)
def
create_connection
(
address
,
timeout
=
None
,
source_address
=
None
):
return
FakeSocket
(
response_text
,
host
=
address
[
0
],
port
=
address
[
1
])
self
.
host
=
'proxy.com'
self
.
conn
=
client
.
HTTPConnection
(
self
.
host
)
self
.
conn
.
_create_connection
=
create_connection
def
tearDown
(
self
):
self
.
conn
.
close
()
def
test_set_tunnel_host_port_headers
(
self
):
tunnel_host
=
'destination.com'
tunnel_port
=
8888
tunnel_headers
=
{
'User-Agent'
:
'Mozilla/5.0 (compatible, MSIE 11)'
}
self
.
conn
.
set_tunnel
(
tunnel_host
,
port
=
tunnel_port
,
headers
=
tunnel_headers
)
self
.
conn
.
request
(
'HEAD'
,
'/'
,
''
)
self
.
assertEqual
(
self
.
conn
.
sock
.
host
,
self
.
host
)
self
.
assertEqual
(
self
.
conn
.
sock
.
port
,
client
.
HTTP_PORT
)
self
.
assertEqual
(
self
.
conn
.
_tunnel_host
,
tunnel_host
)
self
.
assertEqual
(
self
.
conn
.
_tunnel_port
,
tunnel_port
)
self
.
assertEqual
(
self
.
conn
.
_tunnel_headers
,
tunnel_headers
)
def
test_disallow_set_tunnel_after_connect
(
self
):
# Once connected, we shouldn't be able to tunnel anymore
self
.
conn
.
connect
()
self
.
assertRaises
(
RuntimeError
,
self
.
conn
.
set_tunnel
,
'destination.com'
)
def
test_connect_with_tunnel
(
self
):
self
.
conn
.
set_tunnel
(
'destination.com'
)
self
.
conn
.
request
(
'HEAD'
,
'/'
,
''
)
self
.
assertEqual
(
self
.
conn
.
sock
.
host
,
self
.
host
)
self
.
assertEqual
(
self
.
conn
.
sock
.
port
,
client
.
HTTP_PORT
)
self
.
assertIn
(
b'CONNECT destination.com'
,
self
.
conn
.
sock
.
data
)
# issue22095
self
.
assertNotIn
(
b'Host: destination.com:None'
,
self
.
conn
.
sock
.
data
)
self
.
assertIn
(
b'Host: destination.com'
,
self
.
conn
.
sock
.
data
)
# This test should be removed when CONNECT gets the HTTP/1.1 blessing
self
.
assertNotIn
(
b'Host: proxy.com'
,
self
.
conn
.
sock
.
data
)
def
test_connect_put_request
(
self
):
self
.
conn
.
set_tunnel
(
'destination.com'
)
self
.
conn
.
request
(
'PUT'
,
'/'
,
''
)
self
.
assertEqual
(
self
.
conn
.
sock
.
host
,
self
.
host
)
self
.
assertEqual
(
self
.
conn
.
sock
.
port
,
client
.
HTTP_PORT
)
self
.
assertIn
(
b'CONNECT destination.com'
,
self
.
conn
.
sock
.
data
)
self
.
assertIn
(
b'Host: destination.com'
,
self
.
conn
.
sock
.
data
)
@
support
.
reap_threads
def
test_main
(
verbose
=
None
):
support
.
run_unittest
(
HeaderTests
,
OfflineTest
,
BasicTest
,
TimeoutTest
,
HTTPSTest
,
RequestBodyTest
,
SourceAddressTest
,
HTTPResponseTest
)
HTTPResponseTest
,
TunnelTests
)
if
__name__
==
'__main__'
:
test_main
()
src/greentest/3.4/test_ssl.py
View file @
24fc481b
...
...
@@ -686,7 +686,8 @@ class ContextTests(unittest.TestCase):
self
.
assertEqual
(
ssl
.
OP_ALL
|
ssl
.
OP_NO_TLSv1
|
ssl
.
OP_NO_SSLv3
,
ctx
.
options
)
ctx
.
options
=
0
self
.
assertEqual
(
0
,
ctx
.
options
)
# Ubuntu has OP_NO_SSLv3 forced on by default
self
.
assertEqual
(
0
,
ctx
.
options
&
~
ssl
.
OP_NO_SSLv3
)
else
:
with
self
.
assertRaises
(
ValueError
):
ctx
.
options
=
0
...
...
@@ -1267,7 +1268,7 @@ class NetworkedTests(unittest.TestCase):
# Issue #19919: Windows machines or VMs hosted on Windows
# machines sometimes return EWOULDBLOCK.
errors
=
(
errno
.
ECONNREFUSED
,
errno
.
EHOSTUNREACH
,
errno
.
ECONNREFUSED
,
errno
.
EHOSTUNREACH
,
errno
.
ETIMEDOUT
,
errno
.
EWOULDBLOCK
,
)
self
.
assertIn
(
rc
,
errors
)
...
...
@@ -2292,7 +2293,6 @@ else:
chatty
=
True
,
connectionchatty
=
True
)
wrapped
=
False
with
server
:
s
=
socket
.
socket
()
s
.
setblocking
(
1
)
...
...
@@ -2309,7 +2309,6 @@ else:
else
:
s
.
send
(
indata
)
outdata
=
s
.
recv
(
1024
)
msg
=
outdata
.
strip
().
lower
()
if
indata
==
b"STARTTLS"
and
msg
.
startswith
(
b"ok"
):
# STARTTLS ok, switch to secure mode
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment