Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gevent
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
gevent
Commits
4dfd64c8
Commit
4dfd64c8
authored
Sep 08, 2009
by
Denis Bilenko
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update wsgi_test.py to be reusable by wsgi2
parent
ade50706
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
126 additions
and
133 deletions
+126
-133
greentest/wsgi_test.py
greentest/wsgi_test.py
+126
-133
No files found.
greentest/wsgi_test.py
View file @
4dfd64c8
...
...
@@ -29,7 +29,6 @@ import urllib2
import
greentest
import
gevent
from
gevent
import
wsgi
from
gevent
import
socket
...
...
@@ -39,82 +38,7 @@ except ImportError:
from
StringIO
import
StringIO
def
hello_world
(
env
,
start_response
):
if
env
[
'PATH_INFO'
]
==
'notexist'
:
start_response
(
'404 Not Found'
,
[(
'Content-type'
,
'text/plain'
)])
return
[
"not found"
]
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
)])
return
[
"hello world"
]
def
hello_world_explicit_content_length
(
env
,
start_response
):
if
env
[
'PATH_INFO'
]
==
'notexist'
:
msg
=
'not found'
start_response
(
'404 Not Found'
,
[(
'Content-type'
,
'text/plain'
),
(
'Content-Length'
,
len
(
msg
))])
return
[
msg
]
msg
=
'hello world'
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
),
(
'Content-Length'
,
len
(
msg
))])
return
[
msg
]
def
hello_world_yield
(
env
,
start_response
):
if
env
[
'PATH_INFO'
]
==
'notexist'
:
start_response
(
'404 Not Found'
,
[(
'Content-type'
,
'text/plain'
)])
yield
"not found"
else
:
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
)])
yield
"hello world"
def
chunked_app
(
env
,
start_response
):
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
)])
yield
"this"
yield
"is"
yield
"chunked"
def
big_chunks
(
env
,
start_response
):
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
)])
line
=
'a'
*
8192
for
x
in
range
(
10
):
yield
line
def
use_write
(
env
,
start_response
):
if
env
[
'PATH_INFO'
]
==
'/a'
:
write
=
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
),
(
'Content-Length'
,
'5'
)])
write
(
'abcde'
)
if
env
[
'PATH_INFO'
]
==
'/b'
:
write
=
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
)])
write
(
'abcde'
)
return
[]
def
chunked_post
(
env
,
start_response
):
start_response
(
'200 OK'
,
[(
'Content-type'
,
'text/plain'
)])
if
env
[
'PATH_INFO'
]
==
'/a'
:
return
[
env
[
'wsgi.input'
].
read
()]
elif
env
[
'PATH_INFO'
]
==
'/b'
:
return
[
x
for
x
in
iter
(
lambda
:
env
[
'wsgi.input'
].
read
(
4096
),
''
)]
elif
env
[
'PATH_INFO'
]
==
'/c'
:
return
[
x
for
x
in
iter
(
lambda
:
env
[
'wsgi.input'
].
read
(
1
),
''
)]
class
Site
(
object
):
def
__init__
(
self
,
application
):
self
.
application
=
application
def
__call__
(
self
,
env
,
start_response
):
return
self
.
application
(
env
,
start_response
)
CONTENT_LENGTH
=
'content-length'
CONTENT_LENGTH
=
'Content-Length'
class
ConnectionClosed
(
Exception
):
...
...
@@ -135,9 +59,8 @@ def read_headers(fd):
except
:
print
'Failed to split: %r'
%
(
line
,
)
raise
key
=
key
.
lower
()
assert
key
not
in
headers
,
'Header %r is sent more than once'
%
key
headers
[
key
.
lower
()]
=
value
assert
key
.
lower
()
not
in
[
x
.
lower
()
for
x
in
headers
.
keys
()],
'Header %r is sent more than once: %r'
%
(
key
,
headers
)
headers
[
key
]
=
value
return
response_line
,
headers
...
...
@@ -162,12 +85,13 @@ def iread_chunks(fd):
def
read_http
(
fd
):
response_line
,
headers
=
read_headers
(
fd
)
if
CONTENT_LENGTH
in
headers
:
if
'chunked'
in
headers
.
get
(
'Transfer-Encoding'
,
''
):
if
CONTENT_LENGTH
in
headers
:
print
"WARNING: server used chunked transfer-encoding despite having Content-Length header (libevent 1.x's bug)"
body
=
''
.
join
(
iread_chunks
(
fd
))
elif
CONTENT_LENGTH
in
headers
:
num
=
int
(
headers
[
CONTENT_LENGTH
])
body
=
fd
.
read
(
num
)
#print body
elif
'chunked'
in
headers
.
get
(
'transfer-encoding'
,
''
):
body
=
''
.
join
(
iread_chunks
(
fd
))
else
:
body
=
None
...
...
@@ -176,42 +100,60 @@ def read_http(fd):
class
TestCase
(
greentest
.
TestCase
):
def
listen
(
self
):
return
socket
.
tcp_listener
((
'0.0.0.0'
,
0
))
def
get_wsgi_module
(
self
):
from
gevent
import
wsgi
return
wsgi
def
setUp
(
self
):
self
.
logfile
=
sys
.
stderr
# StringIO()
self
.
site
=
Site
(
self
.
application
)
listener
=
self
.
listen
()
self
.
port
=
listener
.
getsockname
()[
1
]
self
.
server
=
gevent
.
spawn
(
wsgi
.
server
,
listener
,
self
.
site
,
max_size
=
128
,
log
=
self
.
logfile
)
self
.
server
=
self
.
get_wsgi_module
().
WSGIServer
((
'127.0.0.1'
,
0
),
self
.
application
)
self
.
server
.
start
()
self
.
port
=
self
.
server
.
server_port
def
tearDown
(
self
):
self
.
server
.
kill
(
block
=
True
)
# XXX server should have 'close' method which closes everything reliably
# XXX currently listening socket is kept open
timeout
=
gevent
.
Timeout
.
start_new
(
0.5
)
try
:
self
.
server
.
stop
()
finally
:
timeout
.
cancel
()
# XXX currently listening socket is kept open in gevent.wsgi
class
TestHttpdBasic
(
TestCase
):
application
=
staticmethod
(
hello_world
)
@
staticmethod
def
application
(
env
,
start_response
):
path
=
env
[
'PATH_INFO'
]
if
path
==
'/'
:
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
)])
return
[
"hello world"
]
else
:
start_response
(
'404 Not Found'
,
[(
'Content-Type'
,
'text/plain'
)])
return
[
"not found"
]
def
test_001_server
(
self
):
sock
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
))
sock
.
sendall
(
'GET / HTTP/1.
0
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
sock
.
sendall
(
'GET / HTTP/1.
1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
\
r
\
n
'
)
result
=
sock
.
makefile
().
read
()
sock
.
close
()
## The server responds with the maximum version it supports
self
.
assert_
(
result
.
startswith
(
'HTTP'
),
result
)
self
.
assert_
(
result
.
endswith
(
'hello world'
))
self
.
assert_
(
result
.
startswith
(
'HTTP/1.1 200 OK
\
r
\
n
'
),
result
)
self
.
assert_
(
result
.
endswith
(
'hello world'
),
result
)
def
test_002_keepalive
(
self
):
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'GET / HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
read_http
(
fd
)
fd
.
write
(
'GET /notexist HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
fd
.
close
()
def
test_0021_keepalive
(
self
):
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'GET / HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
read_http
(
fd
)
fd
.
write
(
'GET /notexist HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
firstline
,
headers
,
body
=
read_http
(
fd
)
assert
firstline
==
'HTTP/1.1 200 OK
\
r
\
n
'
,
repr
(
firstline
)
assert
body
==
'hello world'
,
repr
(
body
)
firstline
,
header
,
body
=
read_http
(
fd
)
assert
firstline
==
'HTTP/1.1 404 Not Found
\
r
\
n
'
,
repr
(
firstline
)
assert
body
==
'not found'
,
repr
(
body
)
fd
.
close
()
def
test_003_passing_non_int_to_read
(
self
):
...
...
@@ -268,20 +210,40 @@ class TestHttpdBasic(TestCase):
class
TestExplicitContentLength
(
TestHttpdBasic
):
application
=
staticmethod
(
hello_world_explicit_content_length
)
@
staticmethod
def
application
(
env
,
start_response
):
path
=
env
[
'PATH_INFO'
]
if
path
==
'/'
:
msg
=
'hello world'
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
),
(
'Content-Length'
,
len
(
msg
))])
else
:
msg
=
'not found'
start_response
(
'404 Not Found'
,
[(
'Content-Type'
,
'text/plain'
),
(
'Content-Length'
,
len
(
msg
))])
return
[
msg
]
class
TestYield
(
TestHttpdBasic
):
application
=
staticmethod
(
hello_world_yield
)
@
staticmethod
def
hello_world_yield
(
env
,
start_response
):
path
=
env
[
'PATH_INFO'
]
if
path
==
'/'
:
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
)])
yield
"hello world"
else
:
start_response
(
'404 Not Found'
,
[(
'Content-Type'
,
'text/plain'
)])
yield
"not found"
class
TestGetArg
(
TestCase
):
def
application
(
self
,
env
,
start_response
):
@
staticmethod
def
application
(
env
,
start_response
):
body
=
env
[
'wsgi.input'
].
read
()
a
=
cgi
.
parse_qs
(
body
).
get
(
'a'
,
[
1
])[
0
]
start_response
(
'200 OK'
,
[(
'Content-
t
ype'
,
'text/plain'
)])
start_response
(
'200 OK'
,
[(
'Content-
T
ype'
,
'text/plain'
)])
return
[
'a is %s, body is %s'
%
(
a
,
body
)]
def
test_007_get_arg
(
self
):
...
...
@@ -304,7 +266,12 @@ class TestGetArg(TestCase):
class
TestChunkedApp
(
TestCase
):
application
=
staticmethod
(
chunked_app
)
@
staticmethod
def
application
(
env
,
start_response
):
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
)])
yield
"this"
yield
"is"
yield
"chunked"
def
test_009_chunked_response
(
self
):
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
...
...
@@ -318,14 +285,20 @@ class TestChunkedApp(TestCase):
class
TestBigChunks
(
TestCase
):
application
=
staticmethod
(
big_chunks
)
@
staticmethod
def
application
(
env
,
start_response
):
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
)])
line
=
'a'
*
8192
for
x
in
range
(
10
):
yield
line
def
test_011_multiple_chunks
(
self
):
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'GET / HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
\
r
\
n
'
)
_
,
headers
=
read_headers
(
fd
)
assert
(
'
transfer-e
ncoding'
,
'chunked'
)
in
headers
.
items
(),
headers
assert
(
'
Transfer-E
ncoding'
,
'chunked'
)
in
headers
.
items
(),
headers
chunks
=
0
chunklen
=
int
(
fd
.
readline
(),
16
)
while
chunklen
:
...
...
@@ -338,53 +311,73 @@ class TestBigChunks(TestCase):
class
TestChunkedPost
(
TestCase
):
application
=
staticmethod
(
chunked_post
)
@
staticmethod
def
application
(
env
,
start_response
):
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
)])
if
env
[
'PATH_INFO'
]
==
'/a'
:
data
=
env
[
'wsgi.input'
].
read
()
return
[
data
]
elif
env
[
'PATH_INFO'
]
==
'/b'
:
return
[
x
for
x
in
iter
(
lambda
:
env
[
'wsgi.input'
].
read
(
4096
),
''
)]
elif
env
[
'PATH_INFO'
]
==
'/c'
:
return
[
x
for
x
in
iter
(
lambda
:
env
[
'wsgi.input'
].
read
(
1
),
''
)]
def
test_014_chunked_post
(
self
):
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'P
U
T /a HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
'
fd
.
write
(
'P
OS
T /a HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
'
'Transfer-Encoding: chunked
\
r
\
n
\
r
\
n
'
'2
\
r
\
n
oh
\
r
\
n
4
\
r
\
n
hai
\
r
\
n
0
\
r
\
n
\
r
\
n
'
)
read_headers
(
fd
)
response
=
fd
.
read
()
self
.
assert_
(
response
==
'oh hai'
,
'invalid response %s'
%
response
)
response_line
,
headers
,
body
=
read_http
(
fd
)
self
.
assert_
(
body
==
'oh hai'
,
'invalid response %r'
%
body
)
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'P
U
T /b HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
'
fd
.
write
(
'P
OS
T /b HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
'
'Transfer-Encoding: chunked
\
r
\
n
\
r
\
n
'
'2
\
r
\
n
oh
\
r
\
n
4
\
r
\
n
hai
\
r
\
n
0
\
r
\
n
\
r
\
n
'
)
read_headers
(
fd
)
response
=
fd
.
read
()
self
.
assert_
(
response
==
'oh hai'
,
'invalid response %s'
%
response
)
response_line
,
headers
,
body
=
read_http
(
fd
)
self
.
assert_
(
body
==
'oh hai'
,
'invalid response %r'
%
body
)
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'P
U
T /c HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
'
fd
.
write
(
'P
OS
T /c HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
'
'Transfer-Encoding: chunked
\
r
\
n
\
r
\
n
'
'2
\
r
\
n
oh
\
r
\
n
4
\
r
\
n
hai
\
r
\
n
0
\
r
\
n
\
r
\
n
'
)
#fd.readuntil('\r\n\r\n')
read_headers
(
fd
)
response
=
fd
.
read
(
8192
)
self
.
assert_
(
response
==
'oh hai'
,
'invalid response %s'
%
response
)
response_line
,
headers
,
body
=
read_http
(
fd
)
self
.
assert_
(
body
==
'oh hai'
,
'invalid response %r'
%
body
)
class
TestUseWrite
(
TestCase
):
application
=
staticmethod
(
use_write
)
@
staticmethod
def
application
(
env
,
start_response
):
if
env
[
'PATH_INFO'
]
==
'/a'
:
write
=
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
),
(
'Content-Length'
,
'5'
)])
write
(
'abcde'
)
if
env
[
'PATH_INFO'
]
==
'/b'
:
write
=
start_response
(
'200 OK'
,
[(
'Content-Type'
,
'text/plain'
)])
write
(
'abcde'
)
return
[]
def
test_015_write
(
self
):
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'GET /a HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
\
r
\
n
'
)
response_line
,
headers
,
body
=
read_http
(
fd
)
self
.
assert_
(
'
content-l
ength'
in
headers
)
self
.
assert_
(
'
Content-L
ength'
in
headers
)
fd
=
socket
.
connect_tcp
((
'127.0.0.1'
,
self
.
port
)).
makefile
(
bufsize
=
1
)
fd
.
write
(
'GET /b HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
Connection: close
\
r
\
n
\
r
\
n
'
)
response_line
,
headers
,
body
=
read_http
(
fd
)
self
.
assert_
(
'transfer-encoding'
in
headers
)
self
.
assert_
(
headers
[
'transfer-encoding'
]
==
'chunked'
)
#self.assert_('Transfer-Encoding' in headers)
#self.assert_(headers['Transfer-Encoding'] == 'chunked')
class
TestHttps
(
TestCase
):
class
TestHttps
(
greentest
.
TestCase
):
def
setUp
(
self
):
pass
def
tearDown
(
self
):
pass
def
application
(
self
,
environ
,
start_response
):
start_response
(
'200 OK'
,
{})
...
...
@@ -396,7 +389,7 @@ class TestHttps(greentest.TestCase):
sock
=
socket
.
ssl_listener
((
''
,
4201
),
private_key_file
,
certificate_file
)
g
=
gevent
.
spawn
(
wsgi
.
server
,
sock
,
self
.
application
)
g
=
gevent
.
spawn
(
self
.
get_wsgi_module
()
.
server
,
sock
,
self
.
application
)
try
:
req
=
HTTPRequest
(
"https://localhost:4201/foo"
,
method
=
"POST"
,
data
=
'abc'
)
f
=
urllib2
.
urlopen
(
req
)
...
...
@@ -409,7 +402,7 @@ class TestHttps(greentest.TestCase):
certificate_file
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'test_server.crt'
)
private_key_file
=
os
.
path
.
join
(
os
.
path
.
dirname
(
__file__
),
'test_server.key'
)
sock
=
socket
.
ssl_listener
((
''
,
4202
),
private_key_file
,
certificate_file
)
g
=
gevent
.
spawn
(
wsgi
.
server
,
sock
,
self
.
application
)
g
=
gevent
.
spawn
(
self
.
get_wsgi_module
()
.
server
,
sock
,
self
.
application
)
try
:
req
=
HTTPRequest
(
"https://localhost:4202/foo"
)
f
=
urllib2
.
urlopen
(
req
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment