Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
G
gevent
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Issues
0
Issues
0
List
Boards
Labels
Milestones
Merge Requests
0
Merge Requests
0
Analytics
Analytics
Repository
Value Stream
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Create a new issue
Commits
Issue Boards
Open sidebar
Kirill Smelkov
gevent
Commits
2524fac1
Commit
2524fac1
authored
Jan 07, 2012
by
Denis Bilenko
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
update test__examples.py
parent
3d7bf25b
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
97 additions
and
32 deletions
+97
-32
greentest/test__examples.py
greentest/test__examples.py
+97
-32
No files found.
greentest/test__examples.py
View file @
2524fac1
...
...
@@ -13,21 +13,19 @@ import re
import
gevent
from
gevent
import
socket
import
mysubprocess
as
subprocess
from
gevent.server
import
DatagramServer
,
StreamServer
# Ignore tracebacks: KeyboardInterrupt
base_dir
=
normpath
(
join
(
dirname
(
abspath
(
__file__
)),
'..'
))
glob_expression
=
join
(
base_dir
,
'examples'
,
'*.py'
)
examples
=
glob
.
glob
(
glob_expression
)
examples_directory
=
normpath
(
join
(
dirname
(
abspath
(
__file__
)),
'..'
,
'examples'
))
examples
=
[
basename
(
x
)
for
x
in
glob
.
glob
(
examples_directory
+
'/*.py'
)]
simple_examples
=
[]
examples_directory
=
dirname
(
examples
[
0
])
for
example
in
examples
:
if
'serve_forever'
not
in
open
(
example
).
read
():
if
'serve_forever'
not
in
open
(
join
(
examples_directory
,
example
)
).
read
():
simple_examples
.
append
(
example
)
print
(
'
\
n
'
.
join
(
examples
))
def
make_test
(
path
):
...
...
@@ -38,40 +36,37 @@ def make_test(path):
if
' '
in
path
:
path
=
'"%s"'
%
path
class
Test
Example
(
unittest
.
TestCase
):
class
Test
(
unittest
.
TestCase
):
def
test
(
self
):
exe
=
sys
.
executable
if
' '
in
exe
:
exe
=
'"%s"'
%
exe
cmd
=
'%s %s'
%
(
exe
,
path
)
print
>>
sys
.
stderr
,
cmd
res
=
os
.
system
(
cmd
)
assert
not
res
,
'%s failed with %s'
%
(
path
,
res
)
run_script
(
self
.
path
)
TestExample
.
__name__
=
'TestExample_'
+
basename
(
path
).
split
(
'.'
)[
0
]
Test
.
__name__
=
'Test_'
+
basename
(
path
).
split
(
'.'
)[
0
]
assert
Test
.
__name__
not
in
globals
(),
Test
.
__name__
Test
.
path
=
path
return
Test
Example
return
Test
for
example
in
simple_examples
:
test
=
make_test
(
example
)
if
test
is
not
None
:
globals
()[
test
.
__name__
]
=
test
print
(
'Added %s'
%
test
.
__name__
)
del
test
def
run_script
(
path
,
*
args
)
:
cmd
=
[
sys
.
executable
,
join
(
examples_directory
,
path
)]
+
list
(
args
)
popen
=
subprocess
.
Popen
(
cmd
)
result
=
popen
.
gevent_wait
()
if
result
!=
0
:
raise
AssertionError
(
'%r failed with code %s'
%
(
cmd
,
result
))
class
BaseTestServer
(
unittest
.
TestCase
):
args
=
[]
def
setUp
(
self
):
self
.
process
=
subprocess
.
Popen
([
sys
.
executable
,
join
(
examples_directory
,
self
.
path
)],
cwd
=
examples_directory
)
self
.
process
=
subprocess
.
Popen
([
sys
.
executable
,
join
(
examples_directory
,
self
.
path
)]
+
self
.
args
,
cwd
=
examples_directory
)
time
.
sleep
(
1
)
def
tearDown
(
self
):
self
.
assertEqual
(
self
.
process
.
poll
(),
None
)
self
.
process
.
interrupt
()
time
.
sleep
(
0.5
)
time
.
sleep
(
0.
0
5
)
class
Test_httpserver
(
BaseTestServer
):
...
...
@@ -192,19 +187,89 @@ class Test_echoserver(BaseTestServer):
gevent
.
joinall
([
client1
,
client2
],
raise_error
=
True
)
class
Test_udp_client
(
unittest
.
TestCase
):
path
=
'udp_client.py'
def
test
(
self
):
log
=
[]
def
handle
(
message
,
address
):
log
.
append
(
message
)
server
.
sendto
(
'reply-from-server'
,
address
)
server
=
DatagramServer
(
'127.0.0.1:9000'
,
handle
)
server
.
start
()
try
:
run_script
(
self
.
path
,
'Test_udp_client'
)
finally
:
server
.
close
()
self
.
assertEqual
(
log
,
[
'Test_udp_client'
])
class
Test_udp_server
(
BaseTestServer
):
path
=
'udp_server.py'
def
test
(
self
):
address
=
(
'localhost'
,
9000
)
sock
=
socket
.
socket
(
type
=
socket
.
SOCK_DGRAM
)
sock
.
connect
(
address
)
sock
.
send
(
'Test_udp_server'
)
data
,
address
=
sock
.
recvfrom
(
8192
)
self
.
assertEqual
(
data
,
'Received 15 bytes'
)
class
Test_portforwarder
(
BaseTestServer
):
path
=
'portforwarder.py'
args
=
[
'127.0.0.5:9999'
,
'127.0.0.6:9999'
]
def
test
(
self
):
log
=
[]
def
handle
(
socket
,
address
):
while
True
:
data
=
socket
.
recv
(
1024
)
log
.
append
(
data
)
if
not
data
:
break
server
=
StreamServer
(
'127.0.0.6:9999'
,
handle
)
server
.
start
()
try
:
conn
=
socket
.
create_connection
((
'127.0.0.5'
,
9999
))
conn
.
sendall
(
'msg1'
)
gevent
.
sleep
(
0.01
)
self
.
assertEqual
(
log
,
[
'msg1'
])
conn
.
sendall
(
'msg2'
)
conn
.
close
()
finally
:
server
.
close
()
gevent
.
sleep
(
0.01
)
tests
=
set
()
for
klass
in
globals
().
keys
():
if
klass
.
startswith
(
'Test'
):
path
=
getattr
(
globals
()[
klass
],
'path'
,
None
)
if
path
is
not
None
:
tests
.
add
(
path
)
for
example
in
simple_examples
:
if
example
in
tests
:
continue
test
=
make_test
(
example
)
if
test
is
not
None
:
globals
()[
test
.
__name__
]
=
test
print
(
'Added %s'
%
test
.
__name__
)
del
test
class
TestAllTested
(
unittest
.
TestCase
):
def
test
(
self
):
tests
=
set
()
for
klass
in
globals
():
if
klass
.
startswith
(
'Test'
):
path
=
getattr
(
globals
()[
klass
],
'path'
,
None
)
if
path
is
not
None
:
tests
.
add
(
path
)
untested
=
set
(
examples
)
-
set
(
simple_examples
)
untested
=
set
(
basename
(
path
)
for
path
in
untested
)
-
tests
assert
not
untested
,
'The following examples have not been tested: %s'
%
'
\
n
'
.
join
(
untested
)
if
untested
:
raise
AssertionError
(
'The following examples have not been tested: %s
\
n
- %s'
%
(
len
(
untested
),
'
\
n
- '
.
join
(
untested
)))
del
Test_httpserver
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment