Commit 9135575a authored by Denis Bilenko's avatar Denis Bilenko

test__examples.py: use gevent.subprocess instead of mysubprocess

better start up procedure: check that we started by connecting to an app; also use this check to make sure no other app is bound to this socket
parent d497141b
......@@ -7,17 +7,12 @@ if sys.version_info[0] == 3:
from urllib import request as urllib2
else:
import urllib2
import time
import signal
import re
from urlparse import urlparse
from time import time
import gevent
from gevent import socket
import mysubprocess as subprocess
from gevent import subprocess
from gevent.server import DatagramServer, StreamServer
import gevent.hub
gevent.hub.Hub.backend = (getattr(gevent.hub.Hub, 'backend') or '') + ',nochild'
# Ignore tracebacks: KeyboardInterrupt
......@@ -55,22 +50,80 @@ def make_test(path):
def run_script(path, *args):
cmd = [sys.executable, join(examples_directory, path)] + list(args)
popen = subprocess.Popen(cmd)
result = popen.gevent_wait()
if result != 0:
raise AssertionError('%r failed with code %s' % (cmd, result))
if popen.wait() != 0:
raise AssertionError('%r failed with code %s' % (cmd, popen.wait()))
def kill(popen, signum=9):
while True:
try:
popen.send_signal(signum)
except OSError, ex:
if ex.errno == 3: # No such process
return
raise
else:
gevent.sleep(0.01)
class BaseTestServer(unittest.TestCase):
args = []
stderr = None
def get_address(self):
parsed = urlparse(self.URL)
return parsed.hostname, parsed.port
def get_sock_type(self):
parsed = urlparse(self.URL)
if parsed.scheme.lower() == 'udp':
return socket.SOCK_DGRAM
return socket.SOCK_STREAM
def assert_refused(self):
if self.get_sock_type() == socket.SOCK_DGRAM:
return
try:
self.connect()
except socket.error, ex:
if 'refused' not in str(ex):
raise
else:
raise AssertionError('Connection to %s succeeds before starting the server' % self.URL)
def connect(self):
sock = socket.socket(type=self.get_sock_type())
sock.connect(self.get_address())
return sock
def wait_start(self, timeout):
end = time() + timeout
while time() <= end:
if self.process.poll() is not None:
raise AssertionError('Process died')
try:
self.connect()
except socket.error:
gevent.sleep(0.01)
else:
return
else:
raise AssertionError('Failed to start')
def setUp(self):
self.process = subprocess.Popen([sys.executable, join(examples_directory, self.path)] + self.args, cwd=examples_directory)
time.sleep(1)
self.assert_refused()
self.process = subprocess.Popen([sys.executable, join(examples_directory, self.path)] + self.args,
cwd=examples_directory, stderr=self.stderr)
try:
self.wait_start(1)
except:
kill(self.process)
raise
assert self.process.poll() is None, self.process
def tearDown(self):
self.assertEqual(self.process.poll(), None)
self.process.interrupt()
time.sleep(0.05)
kill(self.process)
class Test_httpserver(BaseTestServer):
......@@ -108,39 +161,14 @@ class Test_wsgiserver(Test_httpserver):
path = 'wsgiserver.py'
if hasattr(socket, 'ssl'):
class Test_wsgiserver_ssl(Test_httpserver):
class Test_wsgiserver_ssl(Test_httpserver):
path = 'wsgiserver_ssl.py'
URL = 'https://localhost:8443'
else:
class Test_wsgiserver_ssl(unittest.TestCase):
path = 'wsgiserver_ssl.py'
def setUp(self):
self.process = subprocess.Popen([sys.executable, join(examples_directory, self.path)],
cwd=examples_directory, stderr=subprocess.PIPE)
time.sleep(1)
def test(self):
self.assertEqual(self.process.poll(), 1)
stderr = self.process.stderr.read().strip()
m = re.match('Traceback \(most recent call last\):.*?ImportError: .*?ssl.*', stderr, re.DOTALL)
assert m is not None, repr(stderr)
def tearDown(self):
if self.process.poll() is None:
try:
SIGINT = getattr(signal, 'SIGINT', None)
if SIGINT is not None:
os.kill(self.process.pid, SIGINT)
time.sleep(0.1)
self.assertEqual(self.process.poll(), 1)
finally:
if self.process.poll() is None:
self.process.kill()
def connect(self):
sock = super(Test_wsgiserver_ssl, self).connect()
from gevent import ssl
return ssl.wrap_socket(sock)
class Test_webpy(Test_httpserver):
......@@ -153,9 +181,9 @@ class Test_webpy(Test_httpserver):
assert "Hello, world" in data, repr(data)
def _test_long(self):
start = time.time()
start = time()
status, data = self.read('/long')
delay = time.time() - start
delay = time() - start
assert 10 - 0.1 < delay < 10 + 0.1, delay
self.assertEqual(status, '200 OK')
self.assertEqual(data, 'Hello, 10 seconds later')
......@@ -174,11 +202,12 @@ class Test_webproxy(Test_httpserver):
class Test_echoserver(BaseTestServer):
URL = 'tcp://127.0.0.1:6000'
path = 'echoserver.py'
def test(self):
def test_client(message):
conn = socket.create_connection(('127.0.0.1', 6000)).makefile(bufsize=1)
conn = self.connect().makefile(bufsize=1)
welcome = conn.readline()
assert 'Welcome' in welcome, repr(welcome)
conn.write(message)
......@@ -211,11 +240,11 @@ class Test_udp_client(unittest.TestCase):
class Test_udp_server(BaseTestServer):
path = 'udp_server.py'
URL = 'udp://localhost:9000'
def test(self):
address = ('localhost', 9000)
sock = socket.socket(type=socket.SOCK_DGRAM)
sock.connect(address)
gevent.sleep(0.3)
sock = self.connect()
sock.send('Test_udp_server')
data, address = sock.recvfrom(8192)
self.assertEqual(data, 'Received 15 bytes')
......@@ -224,6 +253,7 @@ class Test_udp_server(BaseTestServer):
class Test_portforwarder(BaseTestServer):
path = 'portforwarder.py'
args = ['127.0.0.5:9999', '127.0.0.6:9999']
URL = 'tcp://' + args[0]
def test(self):
log = []
......@@ -231,22 +261,31 @@ class Test_portforwarder(BaseTestServer):
def handle(socket, address):
while True:
data = socket.recv(1024)
log.append(data)
if not data:
break
log.append(data)
server = StreamServer('127.0.0.6:9999', handle)
server = StreamServer(self.args[1], handle)
server.start()
try:
conn = socket.create_connection(('127.0.0.5', 9999))
conn = socket.create_connection(self.get_address())
# make sure the connection is accepted at app level rather than at OS level
# before sending a signal
conn.sendall('msg1')
gevent.sleep(0.1)
self.assertEqual(log, ['msg1'])
self.process.send_signal(15)
# now let's make sure the signal was received
gevent.sleep(0.1)
conn.sendall('msg2')
conn.close()
finally:
server.close()
with gevent.Timeout(0.1):
self.process.wait()
self.assertEqual(['msg1', 'msg2'], log)
tests = set()
for klass in globals().keys():
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment