Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gevent
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
gevent
Commits
da8eb17b
Commit
da8eb17b
authored
Sep 08, 2009
by
Denis Bilenko
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
add greentest/test__http.py
parent
a2e96a3a
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
191 additions
and
0 deletions
+191
-0
greentest/test__http.py
greentest/test__http.py
+191
-0
No files found.
greentest/test__http.py
0 → 100644
View file @
da8eb17b
from
__future__
import
with_statement
from
gevent
import
monkey
;
monkey
.
patch_socket
()
import
gevent
from
gevent
import
http
import
greentest
import
os
from
urllib2
import
urlopen
,
URLError
,
HTTPError
import
socket
# add test for "chunked POST input -> chunked output"
class
Expected
(
Exception
):
pass
class
MainLoopServer
(
http
.
HTTPServer
):
spawn
=
None
class
BoundTestCase
(
greentest
.
TestCase
):
address
=
'127.0.0.1'
spawn
=
False
def
setUp
(
self
):
if
self
.
spawn
:
self
.
http
=
http
.
HTTPServer
(
self
.
handle
)
else
:
self
.
http
=
MainLoopServer
(
self
.
handle
)
s
=
self
.
http
.
start
((
self
.
address
,
0
))
self
.
port
=
s
.
getsockname
()[
1
]
def
tearDown
(
self
):
#self.print_netstat('before stop')
with
gevent
.
Timeout
(
0.1
,
False
):
self
.
http
.
stop
()
#self.print_netstat('after stop')
self
.
check_refused
()
def
print_netstat
(
self
,
comment
=
''
):
cmd
=
'sudo netstat -anp | grep %s'
%
self
.
port
print
cmd
,
' # %s'
%
comment
os
.
system
(
cmd
)
@
property
def
url
(
self
):
return
'http://%s:%s'
%
(
self
.
address
,
self
.
port
)
def
connect
(
self
):
s
=
socket
.
socket
()
s
.
connect
((
self
.
address
,
self
.
port
))
return
s
def
check_refused
(
self
):
try
:
self
.
connect
()
except
socket
.
error
,
e
:
if
'ECONNREFUSED'
not
in
str
(
e
):
raise
except
IOError
,
e
:
print
'WARNING: instead of ECONNREFUSED got IOError: %s'
%
e
class
TestStop
(
BoundTestCase
):
# this triggers if connection_closed is not handled properly
#p: http.c:1921: evhttp_send: Assertion `((&evcon->requests)->tqh_first) == req' failed.
def
reply
(
self
,
r
):
# at this point object that was wrapped by r no longer exists
r
.
send_reply
(
200
,
'OK'
,
'hello world'
)
def
handle
(
self
,
r
):
# gonna reply later, when the connection is closed by the client
return
gevent
.
spawn_later
(
0.01
,
self
.
reply
,
r
)
def
test
(
self
):
s
=
self
.
connect
()
s
.
sendall
(
'GET / HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
s
.
sendall
(
'GET / HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
s
.
close
()
self
.
http
.
stop
()
gevent
.
sleep
(
0.02
)
class
TestSendReply
(
BoundTestCase
):
def
handle
(
self
,
r
):
r
.
send_reply
(
200
,
'OK'
,
'hello world'
)
def
test
(
self
):
response
=
urlopen
(
self
.
url
)
assert
response
.
code
==
200
,
response
assert
response
.
msg
==
'OK'
,
response
data
=
response
.
read
()
assert
data
==
'hello world'
,
data
def
test_keepalive
(
self
):
s
=
self
.
connect
()
s
.
sendall
(
'GET / HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
s
.
sendall
(
'GET / HTTP/1.1
\
r
\
n
Host: localhost
\
r
\
n
\
r
\
n
'
)
class
TestSendReplySpawn
(
TestSendReply
):
spawn
=
True
class
TestException
(
BoundTestCase
):
def
handle
(
self
,
r
):
raise
Expected
def
test
(
self
):
try
:
response
=
urlopen
(
self
.
url
)
except
HTTPError
,
e
:
assert
e
.
code
==
500
,
e
assert
e
.
msg
==
'Internal Server Error'
,
e
class
TestExceptionSpawn
(
TestException
):
spawn
=
True
class
TestSendReplyLater
(
BoundTestCase
):
spawn
=
True
def
handle
(
self
,
r
):
gevent
.
sleep
(
0.01
)
r
.
send_reply
(
200
,
'OK'
,
'hello world'
)
def
test
(
self
):
response
=
urlopen
(
self
.
url
)
#print 'connected to %s' % self.url
assert
response
.
code
==
200
,
response
assert
response
.
msg
==
'OK'
,
response
data
=
response
.
read
()
#print 'read data from %s' % self.url
assert
data
==
'hello world'
,
data
def
test_client_closes_10
(
self
):
s
=
self
.
connect
()
s
.
sendall
(
'GET / HTTP/1.0
\
r
\
n
\
r
\
n
'
)
s
.
close
()
gevent
.
sleep
(
0.02
)
def
test_client_closes_11
(
self
):
s
=
self
.
connect
()
s
.
sendall
(
'GET / HTTP/1.1
\
r
\
n
\
r
\
n
'
)
s
.
close
()
gevent
.
sleep
(
0.02
)
# class TestSendReplyStartChunk(BoundTestCase):
# spawn = True
#
# def handle(self, r):
# r.send_reply_start(200, 'OK')
# gevent.sleep(0.2)
# print 'handler sending chunk'
# r.send_reply_chunk('hi')
# print 'handler done'
#
# def test(self):
# response = urlopen(self.url)
# print 'connected to %s' % self.url
# assert response.code == 200, response
# assert response.msg == 'OK', response
# with gevent.Timeout(0.1, False):
# data = response.read()
# assert 'should not read anything', repr(data)
# self.print_netstat('before response.close')
# response.close()
# self.print_netstat('after response.close')
# print 123
# gevent.sleep(0.5)
# print 1234
#
# def test_client_closes(self):
# s = self.connect()
# s.sendall('GET / HTTP/1.0\r\n\r\n')
# gevent.sleep(0.1)
# #self.print_netstat('before close')
# s.close()
# #self.print_netstat('after close')
# gevent.sleep(0.5)
if
__name__
==
'__main__'
:
greentest
.
main
()
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment