Commit d98833c9 authored by Denis Bilenko's avatar Denis Bilenko

many fixes for the testsuite; rewrite testrunner

The stdlib tests are now included in the repository. since ubuntu
does not ships them in any package.

The testrunner is completely rewritten. When testrunner.py was originally written
a lot of tests were failing and some of them were hanging. Thus testrunner.py
was focused on proving good reports and could retry the tests with a particular
test case disabled if it detected time out. Now testrunner is somewhat simpler as
we mostly concerned whether the tests pass or not. It also runs the tests
concurrently, to improve the speed at which we get the results back from travis-ci.
Some tests time execution and assert certain timings, this can fail, especially when
the number of workers is bigger than cpu count. The new testrunner will re-try
tests that failed in a sequential run, when nothing else is run. If this fixes
the problem, the test is reported as "succeeded, but only on the second try" and does
reports the whole suite as failed.

.travis.yml is fixed to download binary packages for all dependencies and use system python.

rename test__benchmarks.py to xtest__benchmarks.py to exclude it form testrunner.py.

Exclude test__count from test_thread.py, because it uses function thread._count() we don't implement

Run monkey patched tests twice, first with patch_all(), then with patch_all(Event=True)
parent 66a6f573
language: python
python:
- "2.5"
# - "2.5"
- "2.6"
- "2.7"
# - "3.2"
env:
-
- GEVENT_RESOLVER=ares
- GEVENT_FILE=thread
- GEVENT_BACKEND=select
- GEVENT_BACKEND=poll
- GEVENT_BACKEND=signalfd
- GEVENT_BACKEND=nosigmask
- GEVENT_RESOLVER=ares GEVENT_BACKEND=signalfd
- GEVENT_RESOLVER=ares GEVENT_FILE=thread
install:
- pip install cython -q --use-mirrors
- deactivate
- export VER=$TRAVIS_PYTHON_VERSION
- export PYTHON=/usr/bin/python$VER
- echo $PYTHON && $PYTHON --version
#broken - if [[ $VER == '2.5' ]]; then sudo apt-get install libssl-dev libkrb5-dev libbluetooth-dev; pip install --use-mirrors sslfix; fi
- curl -sSLO --retry 5 --fail https://github.com/downloads/SiteSupport/gevent/python2.7-cython_0.17_i386.deb
- curl -sSLO --retry 5 --fail https://github.com/downloads/SiteSupport/gevent/python$VER-greenlet_0.4.0_i386.deb
- curl -sSLO --retry 5 --fail https://github.com/downloads/SiteSupport/gevent/python$VER-psycopg2_2.4.5_i386.deb
- curl -sSLO --retry 5 --fail https://github.com/downloads/SiteSupport/gevent/python$VER-pysendfile_2.0.0_i386.deb
- curl -sSLO --retry 5 --fail http://pypi.python.org/packages/source/w/web.py/web.py-0.37.tar.gz #md5=93375e3f03e74d6bf5c5096a4962a8db
- sudo dpkg -i python2.7-cython_0.17_i386.deb
- sudo dpkg -i python$VER-greenlet_0.4.0_i386.deb
- sudo dpkg -i python$VER-psycopg2_2.4.5_i386.deb
- sudo dpkg -i python$VER-pysendfile_2.0.0_i386.deb
- tar -xf web.py-0.37.tar.gz && cd web.py-0.37 && sudo $PYTHON setup.py -q install && cd -
- cython --version
- pip install greenlet --use-mirrors
- python setup.py install
script: cd greentest && python testrunner.py
- $PYTHON -c 'import greenlet; print greenlet, greenlet.__version__; import psycopg2; print psycopg2, psycopg2.__version__; import web; print web, web.__version__'
- "echo `date`: Started installing gevent"
- sudo $PYTHON setup.py install
script:
- "echo `date`: Started testing"
- cd greentest && $PYTHON testrunner.py
- "echo `date`: Finished testing"
notifications:
email:
recipients:
- denis.bilenko@gmail.com
on_success: change
on_failure: change
......@@ -33,6 +33,7 @@ class PortForwarder(StreamServer):
return
gevent.spawn(forward, source, dest)
gevent.spawn(forward, dest, source)
# XXX only one spawn() is needed
def close(self):
if self.closed:
......
......@@ -25,6 +25,7 @@ class long_polling:
time.sleep(10) # possible to block the request indefinitely, without harming others
return 'Hello, 10 seconds later'
if __name__ == "__main__":
application = web.application(urls, globals()).wsgifunc()
print 'Serving on 8088...'
......
import httplib
import StringIO
import sys
from unittest import TestCase
from test import test_support
class FakeSocket:
def __init__(self, text, fileclass=StringIO.StringIO):
self.text = text
self.fileclass = fileclass
def sendall(self, data):
self.data = data
def makefile(self, mode, bufsize=None):
if mode != 'r' and mode != 'rb':
raise httplib.UnimplementedFileMode()
return self.fileclass(self.text)
class NoEOFStringIO(StringIO.StringIO):
"""Like StringIO, but raises AssertionError on EOF.
This is used below to test that httplib doesn't try to read
more from the underlying file than it should.
"""
def read(self, n=-1):
data = StringIO.StringIO.read(self, n)
if data == '':
raise AssertionError('caller tried to read past EOF')
return data
def readline(self, length=None):
data = StringIO.StringIO.readline(self, length)
if data == '':
raise AssertionError('caller tried to read past EOF')
return data
class HeaderTests(TestCase):
def test_auto_headers(self):
# Some headers are added automatically, but should not be added by
# .request() if they are explicitly set.
import httplib
class HeaderCountingBuffer(list):
def __init__(self):
self.count = {}
def append(self, item):
kv = item.split(':')
if len(kv) > 1:
# item is a 'Key: Value' header string
lcKey = kv[0].lower()
self.count.setdefault(lcKey, 0)
self.count[lcKey] += 1
list.append(self, item)
for explicit_header in True, False:
for header in 'Content-length', 'Host', 'Accept-encoding':
conn = httplib.HTTPConnection('example.com')
conn.sock = FakeSocket('blahblahblah')
conn._buffer = HeaderCountingBuffer()
body = 'spamspamspam'
headers = {}
if explicit_header:
headers[header] = str(len(body))
conn.request('POST', '/', body, headers)
self.assertEqual(conn._buffer.count[header.lower()], 1)
# Collect output to a buffer so that we don't have to cope with line-ending
# issues across platforms. Specifically, the headers will have \r\n pairs
# and some platforms will strip them from the output file.
def test():
buf = StringIO.StringIO()
_stdout = sys.stdout
try:
sys.stdout = buf
_test()
finally:
sys.stdout = _stdout
# print individual lines with endings stripped
s = buf.getvalue()
for line in s.split("\n"):
print line.strip()
def _test():
# Test HTTP status lines
body = "HTTP/1.1 200 Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock, 1)
resp.begin()
print resp.read()
resp.close()
body = "HTTP/1.1 400.100 Not Ok\r\n\r\nText"
sock = FakeSocket(body)
resp = httplib.HTTPResponse(sock, 1)
try:
resp.begin()
except httplib.BadStatusLine:
print "BadStatusLine raised as expected"
else:
print "Expect BadStatusLine"
# Check invalid host_port
for hp in ("www.python.org:abc", "www.python.org:"):
try:
h = httplib.HTTP(hp)
except httplib.InvalidURL:
print "InvalidURL raised as expected"
else:
print "Expect InvalidURL"
for hp,h,p in (("[fe80::207:e9ff:fe9b]:8000", "fe80::207:e9ff:fe9b", 8000),
("www.python.org:80", "www.python.org", 80),
("www.python.org", "www.python.org", 80),
("[fe80::207:e9ff:fe9b]", "fe80::207:e9ff:fe9b", 80)):
try:
http = httplib.HTTP(hp)
except httplib.InvalidURL:
print "InvalidURL raised erroneously"
c = http._conn
if h != c.host: raise AssertionError, ("Host incorrectly parsed", h, c.host)
if p != c.port: raise AssertionError, ("Port incorrectly parsed", p, c.host)
# test response with multiple message headers with the same field name.
text = ('HTTP/1.1 200 OK\r\n'
'Set-Cookie: Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"\r\n'
'Set-Cookie: Part_Number="Rocket_Launcher_0001"; Version="1";'
' Path="/acme"\r\n'
'\r\n'
'No body\r\n')
hdr = ('Customer="WILE_E_COYOTE"; Version="1"; Path="/acme"'
', '
'Part_Number="Rocket_Launcher_0001"; Version="1"; Path="/acme"')
s = FakeSocket(text)
r = httplib.HTTPResponse(s, 1)
r.begin()
cookies = r.getheader("Set-Cookie")
if cookies != hdr:
raise AssertionError, "multiple headers not combined properly"
# Test that the library doesn't attempt to read any data
# from a HEAD request. (Tickles SF bug #622042.)
sock = FakeSocket(
'HTTP/1.1 200 OK\r\n'
'Content-Length: 14432\r\n'
'\r\n',
NoEOFStringIO)
resp = httplib.HTTPResponse(sock, 1, method="HEAD")
resp.begin()
if resp.read() != "":
raise AssertionError, "Did not expect response from HEAD request"
resp.close()
class OfflineTest(TestCase):
def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
def test_main(verbose=None):
tests = [HeaderTests,OfflineTest]
test_support.run_unittest(*tests)
test()
# Some simple Queue module tests, plus some failure conditions
# to ensure the Queue locks remain stable.
import Queue
import sys
import threading
import time
from test.test_support import verify, TestFailed, verbose
QUEUE_SIZE = 5
# A thread to run a function that unclogs a blocked Queue.
class _TriggerThread(threading.Thread):
def __init__(self, fn, args):
self.fn = fn
self.args = args
self.startedEvent = threading.Event()
threading.Thread.__init__(self)
def run(self):
# The sleep isn't necessary, but is intended to give the blocking
# function in the main thread a chance at actually blocking before
# we unclog it. But if the sleep is longer than the timeout-based
# tests wait in their blocking functions, those tests will fail.
# So we give them much longer timeout values compared to the
# sleep here (I aimed at 10 seconds for blocking functions --
# they should never actually wait that long - they should make
# progress as soon as we call self.fn()).
time.sleep(0.1)
self.startedEvent.set()
self.fn(*self.args)
# Execute a function that blocks, and in a separate thread, a function that
# triggers the release. Returns the result of the blocking function.
# Caution: block_func must guarantee to block until trigger_func is
# called, and trigger_func must guarantee to change queue state so that
# block_func can make enough progress to return. In particular, a
# block_func that just raises an exception regardless of whether trigger_func
# is called will lead to timing-dependent sporadic failures, and one of
# those went rarely seen but undiagnosed for years. Now block_func
# must be unexceptional. If block_func is supposed to raise an exception,
# call _doExceptionalBlockingTest() instead.
def _doBlockingTest(block_func, block_args, trigger_func, trigger_args):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
result = block_func(*block_args)
# If block_func returned before our thread made the call, we failed!
if not t.startedEvent.isSet():
raise TestFailed("blocking function '%r' appeared not to block" %
block_func)
t.join(10) # make sure the thread terminates
if t.isAlive():
raise TestFailed("trigger function '%r' appeared to not return" %
trigger_func)
return result
# Call this instead if block_func is supposed to raise an exception.
def _doExceptionalBlockingTest(block_func, block_args, trigger_func,
trigger_args, expected_exception_class):
t = _TriggerThread(trigger_func, trigger_args)
t.start()
try:
try:
block_func(*block_args)
except expected_exception_class:
raise
else:
raise TestFailed("expected exception of kind %r" %
expected_exception_class)
finally:
t.join(10) # make sure the thread terminates
if t.isAlive():
raise TestFailed("trigger function '%r' appeared to not return" %
trigger_func)
if not t.startedEvent.isSet():
raise TestFailed("trigger thread ended but event never set")
# A Queue subclass that can provoke failure at a moment's notice :)
class FailingQueueException(Exception):
pass
class FailingQueue(Queue.Queue):
def __init__(self, *args):
self.fail_next_put = False
self.fail_next_get = False
Queue.Queue.__init__(self, *args)
def _put(self, item):
if self.fail_next_put:
self.fail_next_put = False
raise FailingQueueException, "You Lose"
return Queue.Queue._put(self, item)
def _get(self):
if self.fail_next_get:
self.fail_next_get = False
raise FailingQueueException, "You Lose"
return Queue.Queue._get(self)
def FailingQueueTest(q):
if not q.empty():
raise RuntimeError, "Call this function with an empty queue"
for i in range(QUEUE_SIZE-1):
q.put(i)
# Test a failing non-blocking put.
q.fail_next_put = True
try:
q.put("oops", block=0)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
q.fail_next_put = True
try:
q.put("oops", timeout=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
q.put("last")
verify(q.full(), "Queue should be full")
# Test a failing blocking put
q.fail_next_put = True
try:
_doBlockingTest(q.put, ("full",), q.get, ())
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
# Test a failing timeout put
q.fail_next_put = True
try:
_doExceptionalBlockingTest(q.put, ("full", True, 10), q.get, (),
FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# Check the Queue isn't damaged.
# put failed, but get succeeded - re-add
q.put("last")
verify(q.full(), "Queue should be full")
q.get()
verify(not q.full(), "Queue should not be full")
q.put("last")
verify(q.full(), "Queue should be full")
# Test a blocking put
_doBlockingTest( q.put, ("full",), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
verify(q.empty(), "Queue should be empty")
q.put("first")
q.fail_next_get = True
try:
q.get()
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
verify(not q.empty(), "Queue should not be empty")
q.fail_next_get = True
try:
q.get(timeout=0.1)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
verify(not q.empty(), "Queue should not be empty")
q.get()
verify(q.empty(), "Queue should be empty")
q.fail_next_get = True
try:
_doExceptionalBlockingTest(q.get, (), q.put, ('empty',),
FailingQueueException)
raise TestFailed("The queue didn't fail when it should have")
except FailingQueueException:
pass
# put succeeded, but get failed.
verify(not q.empty(), "Queue should not be empty")
q.get()
verify(q.empty(), "Queue should be empty")
def SimpleQueueTest(q):
if not q.empty():
raise RuntimeError, "Call this function with an empty queue"
# I guess we better check things actually queue correctly a little :)
q.put(111)
q.put(222)
verify(q.get() == 111 and q.get() == 222,
"Didn't seem to queue the correct data!")
for i in range(QUEUE_SIZE-1):
q.put(i)
verify(not q.empty(), "Queue should not be empty")
verify(not q.full(), "Queue should not be full")
q.put("last")
verify(q.full(), "Queue should be full")
try:
q.put("full", block=0)
raise TestFailed("Didn't appear to block with a full queue")
except Queue.Full:
pass
try:
q.put("full", timeout=0.01)
raise TestFailed("Didn't appear to time-out with a full queue")
except Queue.Full:
pass
# Test a blocking put
_doBlockingTest(q.put, ("full",), q.get, ())
_doBlockingTest(q.put, ("full", True, 10), q.get, ())
# Empty it
for i in range(QUEUE_SIZE):
q.get()
verify(q.empty(), "Queue should be empty")
try:
q.get(block=0)
raise TestFailed("Didn't appear to block with an empty queue")
except Queue.Empty:
pass
try:
q.get(timeout=0.01)
raise TestFailed("Didn't appear to time-out with an empty queue")
except Queue.Empty:
pass
# Test a blocking get
_doBlockingTest(q.get, (), q.put, ('empty',))
_doBlockingTest(q.get, (True, 10), q.put, ('empty',))
cum = 0
cumlock = threading.Lock()
def worker(q):
global cum
while True:
x = q.get()
if x is None:
q.task_done()
return
cumlock.acquire()
try:
cum += x
finally:
cumlock.release()
q.task_done()
def QueueJoinTest(q):
global cum
cum = 0
for i in (0,1):
threading.Thread(target=worker, args=(q,)).start()
for i in xrange(100):
q.put(i)
q.join()
verify(cum==sum(range(100)), "q.join() did not block until all tasks were done")
for i in (0,1):
q.put(None) # instruct the threads to close
q.join() # verify that you can join twice
def QueueTaskDoneTest(q):
try:
q.task_done()
except ValueError:
pass
else:
raise TestFailed("Did not detect task count going negative")
def test():
q = Queue.Queue()
QueueTaskDoneTest(q)
QueueJoinTest(q)
QueueJoinTest(q)
QueueTaskDoneTest(q)
q = Queue.Queue(QUEUE_SIZE)
# Do it a couple of times on the same queue
SimpleQueueTest(q)
SimpleQueueTest(q)
if verbose:
print "Simple Queue tests seemed to work"
q = FailingQueue(QUEUE_SIZE)
FailingQueueTest(q)
FailingQueueTest(q)
if verbose:
print "Failing Queue tests seemed to work"
test()
# Testing select module
from test.test_support import verbose, reap_children
import select
import os
# test some known error conditions
try:
rfd, wfd, xfd = select.select(1, 2, 3)
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
class Nope:
pass
class Almost:
def fileno(self):
return 'fileno'
try:
rfd, wfd, xfd = select.select([Nope()], [], [])
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
try:
rfd, wfd, xfd = select.select([Almost()], [], [])
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
try:
rfd, wfd, xfd = select.select([], [], [], 'not a number')
except TypeError:
pass
else:
print 'expected TypeError exception not raised'
def test():
import sys
if sys.platform[:3] in ('win', 'mac', 'os2', 'riscos'):
if verbose:
print "Can't test select easily on", sys.platform
return
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
p = os.popen(cmd, 'r')
for tout in (0, 1, 2, 4, 8, 16) + (None,)*10:
if verbose:
print 'timeout =', tout
rfd, wfd, xfd = select.select([p], [], [], tout)
if (rfd, wfd, xfd) == ([], [], []):
continue
if (rfd, wfd, xfd) == ([p], [], []):
line = p.readline()
if verbose:
print repr(line)
if not line:
if verbose:
print 'EOF'
break
continue
print 'Unexpected return values from select():', rfd, wfd, xfd
p.close()
reap_children()
test()
# Test the signal module
from test.test_support import verbose, TestSkipped, TestFailed, vereq
import signal
import os, sys, time
if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos':
raise TestSkipped, "Can't test signal on %s" % sys.platform
MAX_DURATION = 20 # Entire test should last at most 20 sec.
if verbose:
x = '-x'
else:
x = '+x'
pid = os.getpid()
if verbose:
print "test runner's pid is", pid
# Shell script that will send us asynchronous signals
script = """
(
set %(x)s
sleep 2
kill -HUP %(pid)d
sleep 2
kill -USR1 %(pid)d
sleep 2
kill -USR2 %(pid)d
) &
""" % vars()
a_called = b_called = False
def handlerA(*args):
global a_called
a_called = True
if verbose:
print "handlerA invoked", args
class HandlerBCalled(Exception):
pass
def handlerB(*args):
global b_called
b_called = True
if verbose:
print "handlerB invoked", args
raise HandlerBCalled, args
# Set up a child to send signals to us (the parent) after waiting long
# enough to receive the alarm. It seems we miss the alarm for some
# reason. This will hopefully stop the hangs on Tru64/Alpha.
# Alas, it doesn't. Tru64 appears to miss all the signals at times, or
# seemingly random subsets of them, and nothing done in force_test_exit
# so far has actually helped.
def force_test_exit():
# Sigh, both imports seem necessary to avoid errors.
import os
fork_pid = os.fork()
if fork_pid:
# In parent.
return fork_pid
# In child.
import os, time
try:
# Wait 5 seconds longer than the expected alarm to give enough
# time for the normal sequence of events to occur. This is
# just a stop-gap to try to prevent the test from hanging.
time.sleep(MAX_DURATION + 5)
print >> sys.__stdout__, ' child should not have to kill parent'
for signame in "SIGHUP", "SIGUSR1", "SIGUSR2", "SIGALRM":
os.kill(pid, getattr(signal, signame))
print >> sys.__stdout__, " child sent", signame, "to", pid
time.sleep(1)
finally:
os._exit(0)
# Install handlers.
hup = signal.signal(signal.SIGHUP, handlerA)
usr1 = signal.signal(signal.SIGUSR1, handlerB)
usr2 = signal.signal(signal.SIGUSR2, signal.SIG_IGN)
alrm = signal.signal(signal.SIGALRM, signal.default_int_handler)
try:
signal.alarm(MAX_DURATION)
vereq(signal.getsignal(signal.SIGHUP), handlerA)
vereq(signal.getsignal(signal.SIGUSR1), handlerB)
vereq(signal.getsignal(signal.SIGUSR2), signal.SIG_IGN)
vereq(signal.getsignal(signal.SIGALRM), signal.default_int_handler)
# Try to ensure this test exits even if there is some problem with alarm.
# Tru64/Alpha often hangs and is ultimately killed by the buildbot.
fork_pid = force_test_exit()
try:
signal.getsignal(4242)
raise TestFailed('expected ValueError for invalid signal # to '
'getsignal()')
except ValueError:
pass
try:
signal.signal(4242, handlerB)
raise TestFailed('expected ValueError for invalid signal # to '
'signal()')
except ValueError:
pass
try:
signal.signal(signal.SIGUSR1, None)
raise TestFailed('expected TypeError for non-callable')
except TypeError:
pass
# Launch an external script to send us signals.
# We expect the external script to:
# send HUP, which invokes handlerA to set a_called
# send USR1, which invokes handlerB to set b_called and raise
# HandlerBCalled
# send USR2, which is ignored
#
# Then we expect the alarm to go off, and its handler raises
# KeyboardInterrupt, finally getting us out of the loop.
os.system(script)
try:
print "starting pause() loop..."
while 1:
try:
if verbose:
print "call pause()..."
signal.pause()
if verbose:
print "pause() returned"
except HandlerBCalled:
if verbose:
print "HandlerBCalled exception caught"
except KeyboardInterrupt:
if verbose:
print "KeyboardInterrupt (the alarm() went off)"
if not a_called:
print 'HandlerA not called'
if not b_called:
print 'HandlerB not called'
finally:
# Forcibly kill the child we created to ping us if there was a test error.
try:
# Make sure we don't kill ourself if there was a fork error.
if fork_pid > 0:
os.kill(fork_pid, signal.SIGKILL)
except:
# If the child killed us, it has probably exited. Killing a
# non-existent process will raise an error which we don't care about.
pass
# Restore handlers.
signal.alarm(0) # cancel alarm in case we died early
signal.signal(signal.SIGHUP, hup)
signal.signal(signal.SIGUSR1, usr1)
signal.signal(signal.SIGUSR2, usr2)
signal.signal(signal.SIGALRM, alrm)
This diff is collapsed.
# Test just the SSL support in the socket module, in a moderately bogus way.
import sys
from test import test_support
import socket
import errno
# Optionally test SSL support. This requires the 'network' resource as given
# on the regrtest command line.
skip_expected = not (test_support.is_resource_enabled('network') and
hasattr(socket, "ssl"))
def test_basic():
test_support.requires('network')
import urllib
if test_support.verbose:
print "test_basic ..."
socket.RAND_status()
try:
socket.RAND_egd(1)
except TypeError:
pass
else:
print "didn't raise TypeError"
socket.RAND_add("this is a random string", 75.0)
try:
f = urllib.urlopen('https://sf.net')
except IOError, exc:
if exc.errno == errno.ETIMEDOUT:
raise test_support.ResourceDenied('HTTPS connection is timing out')
else:
raise
buf = f.read()
f.close()
def test_timeout():
test_support.requires('network')
def error_msg(extra_msg):
print >> sys.stderr, """\
WARNING: an attempt to connect to %r %s, in
test_timeout. That may be legitimate, but is not the outcome we hoped
for. If this message is seen often, test_timeout should be changed to
use a more reliable address.""" % (ADDR, extra_msg)
if test_support.verbose:
print "test_timeout ..."
# A service which issues a welcome banner (without need to write
# anything).
ADDR = "pop.gmail.com", 995
s = socket.socket()
s.settimeout(30.0)
try:
s.connect(ADDR)
except socket.timeout:
error_msg('timed out')
return
except socket.error, exc: # In case connection is refused.
if exc.args[0] == errno.ECONNREFUSED:
error_msg('was refused')
return
else:
raise
ss = socket.ssl(s)
# Read part of return welcome banner twice.
ss.read(1)
ss.read(1)
s.close()
def test_rude_shutdown():
if test_support.verbose:
print "test_rude_shutdown ..."
try:
import threading
except ImportError:
return
# Some random port to connect to.
PORT = [9934]
listener_ready = threading.Event()
listener_gone = threading.Event()
# `listener` runs in a thread. It opens a socket listening on PORT, and
# sits in an accept() until the main thread connects. Then it rudely
# closes the socket, and sets Event `listener_gone` to let the main thread
# know the socket is gone.
def listener():
s = socket.socket()
PORT[0] = test_support.bind_port(s, '', PORT[0])
s.listen(5)
listener_ready.set()
s.accept()
s = None # reclaim the socket object, which also closes it
listener_gone.set()
def connector():
listener_ready.wait()
s = socket.socket()
s.connect(('localhost', PORT[0]))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
except socket.sslerror:
pass
else:
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
t = threading.Thread(target=listener)
t.start()
connector()
t.join()
def test_main():
if not hasattr(socket, "ssl"):
raise test_support.TestSkipped("socket module has no ssl support")
test_rude_shutdown()
test_basic()
test_timeout()
if __name__ == "__main__":
test_main()
# Test suite for SocketServer.py
# converted to unittest (Denis)
from gevent import monkey
monkey.patch_all()
import sys
import test_support
from test_support import (verbose, verify, TESTFN, TestSkipped,
from test import test_support
from test.test_support import (verbose, verify, TESTFN, TestSkipped,
reap_children)
test_support.requires('network')
try:
from SocketServer import *
except ImportError:
from socketserver import *
from SocketServer import *
import socket
import errno
import select
import time
import threading
import sys
import os
import unittest
NREQ = 3
DELAY = 0.05
DELAY = 0.5
class MyMixinHandler:
def handle(self):
......@@ -48,29 +38,20 @@ class MyMixinServer:
self.server_close()
raise
teststring = "hello world\n"
if str is unicode:
def b(s):
return s.encode('ascii')
else:
def b(s):
return s
teststring = b("hello world\n")
NL = b('\n')
def receive(sock, n, timeout=5):
def receive(sock, n, timeout=20):
r, w, x = select.select([sock], [], [], timeout)
if sock in r:
return sock.recv(n)
else:
raise RuntimeError("timed out on %r" % (sock,))
raise RuntimeError, "timed out on %r" % (sock,)
def testdgram(proto, addr):
s = socket.socket(proto, socket.SOCK_DGRAM)
s.sendto(teststring, addr)
buf = data = receive(s, 100)
while data and NL not in buf:
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
......@@ -81,8 +62,7 @@ def teststream(proto, addr):
s.connect(addr)
s.sendall(teststring)
buf = data = receive(s, 100)
while data and NL not in buf:
while data and '\n' not in buf:
data = receive(s, 100)
buf += data
verify(buf == teststring)
......@@ -97,20 +77,19 @@ class ServerThread(threading.Thread):
def run(self):
class svrcls(MyMixinServer, self.__svrcls):
pass
if verbose: print ("thread: creating server")
if verbose: print "thread: creating server"
svr = svrcls(self.__addr, self.__hdlrcls)
# pull the address out of the server in case it changed
# this can happen if another process is using the port
addr = svr.server_address
if addr:
self.__addr = addr
if sys.version_info[:2] >= (2, 5):
if self.__addr != svr.socket.getsockname():
raise RuntimeError('server_address was %s, expected %s' %
(self.__addr, svr.socket.getsockname()))
if verbose: print ("thread: serving three times")
if verbose: print "thread: serving three times"
svr.serve_a_few()
if verbose: print ("thread: done")
if verbose: print "thread: done"
seed = 0
def pickport():
......@@ -124,7 +103,7 @@ def pickaddr(proto):
if proto == socket.AF_INET:
return (host, pickport())
else:
fn = '/tmp/' + TESTFN + str(pickport())
fn = TESTFN + str(pickport())
if os.name == 'os2':
# AF_UNIX socket names on OS/2 require a specific prefix
# which can't include a drive letter and must also use
......@@ -153,19 +132,19 @@ def testloop(proto, servers, hdlrcls, testfunc):
for svrcls in servers:
addr = pickaddr(proto)
if verbose:
print ("ADDR = %s" % (addr,))
print ("CLASS = %s" % svrcls)
print "ADDR =", addr
print "CLASS =", svrcls
t = ServerThread(addr, svrcls, hdlrcls)
if verbose: print ("server created")
if verbose: print "server created"
t.start()
if verbose: print ("server running")
if verbose: print "server running"
for i in range(NREQ):
time.sleep(DELAY)
if verbose: print ("test client %d" % i)
if verbose: print "test client", i
testfunc(proto, addr)
if verbose: print ("waiting for server")
if verbose: print "waiting for server"
t.join()
if verbose: print ("done")
if verbose: print "done"
class ForgivingTCPServer(TCPServer):
# prevent errors if another process is using the port we want
......@@ -179,8 +158,7 @@ class ForgivingTCPServer(TCPServer):
self.server_address = host, port
TCPServer.server_bind(self)
break
except socket.error:
err, msg = sys.exc_info()[1].args
except socket.error, (err, msg):
if err != errno.EADDRINUSE:
raise
print >>sys.__stderr__, \
......@@ -190,7 +168,6 @@ tcpservers = [ForgivingTCPServer, ThreadingTCPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
tcpservers.append(ForkingTCPServer)
udpservers = [UDPServer, ThreadingUDPServer]
if hasattr(os, 'fork') and os.name not in ('os2',):
udpservers.append(ForkingUDPServer)
......@@ -225,28 +202,6 @@ def testall():
# client address so this cannot work:
##testloop(socket.AF_UNIX, dgramservers, MyDatagramHandler, testdgram)
class Test(unittest.TestCase):
def tearDown(self):
sloppy_cleanup()
for tcpserver in tcpservers:
n = tcpserver.__name__
exec ("""def test_%s(self): testloop(socket.AF_INET, [%s], MyStreamHandler, teststream)""" % (n,n))
for udpserver in udpservers:
n = udpserver.__name__
exec ("""def test_%s(self): testloop(socket.AF_INET, [%s], MyDatagramHandler, testdgram)""" % (n,n))
if hasattr(socket, 'AF_UNIX'):
for streamserver in streamservers:
n = streamserver.__name__
exec ("""def test_%s(self): testloop(socket.AF_UNIX, [%s], MyStreamHandler, teststream)""" % (n,n))
def testall():
test_support.run_unittest(Test)
def test_main():
import imp
if imp.lock_held():
......
This diff is collapsed.
......@@ -2,10 +2,7 @@
# Create a bunch of threads, let each do some work, wait until all are done
from gevent import monkey
monkey.patch_all()
from test_support import verbose
from test.test_support import verbose
import random
import thread
import time
......@@ -21,13 +18,13 @@ numtasks = 10
def task(ident):
global running
rmutex.acquire()
delay = random.random() * numtasks * 0.02
delay = random.random() * numtasks
rmutex.release()
if verbose:
print ('task %s will run for %d sec' % (ident, round(delay, 2)))
print 'task', ident, 'will run for', round(delay, 1), 'sec'
time.sleep(delay)
if verbose:
print ('task %s done' % ident)
print 'task', ident, 'done'
mutex.acquire()
running = running - 1
if running == 0:
......@@ -40,7 +37,7 @@ def newtask():
mutex.acquire()
next_ident = next_ident + 1
if verbose:
print ('creating task %s' % next_ident)
print 'creating task', next_ident
thread.start_new_thread(task, (next_ident,))
running = running + 1
mutex.release()
......@@ -48,9 +45,9 @@ def newtask():
for i in range(numtasks):
newtask()
print ('waiting for all tasks to complete')
print 'waiting for all tasks to complete'
done.acquire()
print ('all tasks done')
print 'all tasks done'
class barrier:
def __init__(self, n):
......@@ -89,16 +86,16 @@ def task2(ident):
delay = 0.001
else:
rmutex.acquire()
delay = random.random() * numtasks * 0.02
delay = random.random() * numtasks
rmutex.release()
if verbose:
print ('task %s will run for %d sec' % (ident, round(delay, 2)))
print 'task', ident, 'will run for', round(delay, 1), 'sec'
time.sleep(delay)
if verbose:
print ('task %s entering barrier %s' % (ident, i))
print 'task', ident, 'entering barrier', i
bar.enter()
if verbose:
print ('task %s leaving barrier %s' % (ident, i))
print 'task', ident, 'leaving barrier', i
mutex.acquire()
running -= 1
# Must release mutex before releasing done, else the main thread can
......@@ -109,37 +106,36 @@ def task2(ident):
if finished:
done.release()
print ('\n*** Barrier Test ***')
print '\n*** Barrier Test ***'
if done.acquire(0):
raise ValueError("'done' should have remained acquired")
raise ValueError, "'done' should have remained acquired"
bar = barrier(numtasks)
running = numtasks
for i in range(numtasks):
thread.start_new_thread(task2, (i,))
done.acquire()
print ('all tasks done')
print 'all tasks done'
if hasattr(thread, 'stack_size'):
# not all platforms support changing thread stack size
print ('\n*** Changing thread stack size ***')
if thread.stack_size() != 0:
raise ValueError("initial stack_size not 0")
# not all platforms support changing thread stack size
print '\n*** Changing thread stack size ***'
if thread.stack_size() != 0:
raise ValueError, "initial stack_size not 0"
thread.stack_size(0)
if thread.stack_size() != 0:
raise ValueError("stack_size not reset to default")
thread.stack_size(0)
if thread.stack_size() != 0:
raise ValueError, "stack_size not reset to default"
from os import name as os_name
if os_name in ("nt", "os2", "posix"):
from os import name as os_name
if os_name in ("nt", "os2", "posix"):
tss_supported = 1
try:
thread.stack_size(4096)
except ValueError:
print ('caught expected ValueError setting stack_size(4096)')
print 'caught expected ValueError setting stack_size(4096)'
except thread.error:
tss_supported = 0
print ('platform does not support changing thread stack size')
print 'platform does not support changing thread stack size'
if tss_supported:
failed = lambda s, e: s != e
......@@ -147,18 +143,18 @@ if hasattr(thread, 'stack_size'):
for tss in (262144, 0x100000, 0):
thread.stack_size(tss)
if failed(thread.stack_size(), tss):
raise ValueError(fail_msg % tss)
print ('successfully set stack_size(%d)' % tss)
raise ValueError, fail_msg % tss
print 'successfully set stack_size(%d)' % tss
for tss in (262144, 0x100000):
print ('trying stack_size = %d' % tss)
print 'trying stack_size = %d' % tss
next_ident = 0
for i in range(numtasks):
newtask()
print ('waiting for all tasks to complete')
print 'waiting for all tasks to complete'
done.acquire()
print ('all tasks done')
print 'all tasks done'
# reset stack size to default
thread.stack_size(0)
This diff is collapsed.
import gc
import threading
import unittest
from doctest import DocTestSuite
from test import test_support
class ThreadingLocalTest(unittest.TestCase):
def test_derived(self):
# Issue 3088: if there is a threads switch inside the __init__
# of a threading.local derived class, the per-thread dictionary
# is created but not correctly set on the object.
# The first member set may be bogus.
import time
class Local(threading.local):
def __init__(self):
time.sleep(0.01)
local = Local()
def f(i):
local.x = i
# Simply check that the variable is correctly set
self.assertEqual(local.x, i)
threads= []
for i in range(10):
t = threading.Thread(target=f, args=(i,))
t.start()
threads.append(t)
for t in threads:
t.join()
def test_derived_cycle_dealloc(self):
# http://bugs.python.org/issue6990
class Local(threading.local):
pass
locals = None
passed = [False]
e1 = threading.Event()
e2 = threading.Event()
def f():
# 1) Involve Local in a cycle
cycle = [Local()]
cycle.append(cycle)
cycle[0].foo = 'bar'
# 2) GC the cycle (triggers threadmodule.c::local_clear
# before local_dealloc)
del cycle
gc.collect()
e1.set()
e2.wait()
# 4) New Locals should be empty
passed[0] = all(not hasattr(local, 'foo') for local in locals)
t = threading.Thread(target=f)
t.start()
e1.wait()
# 3) New Locals should recycle the original's address. Creating
# them in the thread overwrites the thread state and avoids the
# bug
locals = [Local() for i in range(10)]
e2.set()
t.join()
self.assertTrue(passed[0])
def test_main():
suite = DocTestSuite('_threading_local')
try:
from thread import _local
except ImportError:
pass
else:
import _threading_local
local_orig = _threading_local.local
def setUp(test):
_threading_local.local = _local
def tearDown(test):
_threading_local.local = local_orig
suite.addTest(DocTestSuite('_threading_local',
setUp=setUp, tearDown=tearDown)
)
suite.addTest(unittest.makeSuite(ThreadingLocalTest))
test_support.run_suite(suite)
if __name__ == '__main__':
test_main()
"""Unit tests for socket timeout feature."""
import unittest
from test import test_support
# This requires the 'network' resource as given on the regrtest command line.
skip_expected = not test_support.is_resource_enabled('network')
import time
import socket
class CreationTestCase(unittest.TestCase):
"""Test case for socket.gettimeout() and socket.settimeout()"""
def setUp(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
def tearDown(self):
self.sock.close()
def testObjectCreation(self):
# Test Socket creation
self.assertEqual(self.sock.gettimeout(), None,
"timeout not disabled by default")
def testFloatReturnValue(self):
# Test return value of gettimeout()
self.sock.settimeout(7.345)
self.assertEqual(self.sock.gettimeout(), 7.345)
self.sock.settimeout(3)
self.assertEqual(self.sock.gettimeout(), 3)
self.sock.settimeout(None)
self.assertEqual(self.sock.gettimeout(), None)
def testReturnType(self):
# Test return type of gettimeout()
self.sock.settimeout(1)
self.assertEqual(type(self.sock.gettimeout()), type(1.0))
self.sock.settimeout(3.9)
self.assertEqual(type(self.sock.gettimeout()), type(1.0))
def testTypeCheck(self):
# Test type checking by settimeout()
self.sock.settimeout(0)
self.sock.settimeout(0L)
self.sock.settimeout(0.0)
self.sock.settimeout(None)
self.assertRaises(TypeError, self.sock.settimeout, "")
self.assertRaises(TypeError, self.sock.settimeout, u"")
self.assertRaises(TypeError, self.sock.settimeout, ())
self.assertRaises(TypeError, self.sock.settimeout, [])
self.assertRaises(TypeError, self.sock.settimeout, {})
self.assertRaises(TypeError, self.sock.settimeout, 0j)
def testRangeCheck(self):
# Test range checking by settimeout()
self.assertRaises(ValueError, self.sock.settimeout, -1)
self.assertRaises(ValueError, self.sock.settimeout, -1L)
self.assertRaises(ValueError, self.sock.settimeout, -1.0)
def testTimeoutThenBlocking(self):
# Test settimeout() followed by setblocking()
self.sock.settimeout(10)
self.sock.setblocking(1)
self.assertEqual(self.sock.gettimeout(), None)
self.sock.setblocking(0)
self.assertEqual(self.sock.gettimeout(), 0.0)
self.sock.settimeout(10)
self.sock.setblocking(0)
self.assertEqual(self.sock.gettimeout(), 0.0)
self.sock.setblocking(1)
self.assertEqual(self.sock.gettimeout(), None)
def testBlockingThenTimeout(self):
# Test setblocking() followed by settimeout()
self.sock.setblocking(0)
self.sock.settimeout(1)
self.assertEqual(self.sock.gettimeout(), 1)
self.sock.setblocking(1)
self.sock.settimeout(1)
self.assertEqual(self.sock.gettimeout(), 1)
class TimeoutTestCase(unittest.TestCase):
"""Test case for socket.socket() timeout functions"""
# There are a number of tests here trying to make sure that an operation
# doesn't take too much longer than expected. But competing machine
# activity makes it inevitable that such tests will fail at times.
# When fuzz was at 1.0, I (tim) routinely saw bogus failures on Win2K
# and Win98SE. Boosting it to 2.0 helped a lot, but isn't a real
# solution.
fuzz = 2.0
def setUp(self):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.addr_remote = ('www.python.org.', 80)
self.addr_local = ('127.0.0.1', 25339)
def tearDown(self):
self.sock.close()
def testConnectTimeout(self):
# Test connect() timeout
_timeout = 0.001
self.sock.settimeout(_timeout)
# If we are too close to www.python.org, this test will fail.
# Pick a host that should be farther away.
if (socket.getfqdn().split('.')[-2:] == ['python', 'org'] or
socket.getfqdn().split('.')[-2:-1] == ['xs4all']):
self.addr_remote = ('tut.fi', 80)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.connect,
self.addr_remote)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is more than %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testRecvTimeout(self):
# Test recv() timeout
_timeout = 0.02
self.sock.connect(self.addr_remote)
self.sock.settimeout(_timeout)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.recv, 1024)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testAcceptTimeout(self):
# Test accept() timeout
_timeout = 2
self.sock.settimeout(_timeout)
self.sock.bind(self.addr_local)
self.sock.listen(5)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.accept)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testRecvfromTimeout(self):
# Test recvfrom() timeout
_timeout = 2
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.settimeout(_timeout)
self.sock.bind(self.addr_local)
_t1 = time.time()
self.failUnlessRaises(socket.error, self.sock.recvfrom, 8192)
_t2 = time.time()
_delta = abs(_t1 - _t2)
self.assert_(_delta < _timeout + self.fuzz,
"timeout (%g) is %g seconds more than expected (%g)"
%(_delta, self.fuzz, _timeout))
def testSend(self):
# Test send() timeout
# couldn't figure out how to test it
pass
def testSendto(self):
# Test sendto() timeout
# couldn't figure out how to test it
pass
def testSendall(self):
# Test sendall() timeout
# couldn't figure out how to test it
pass
def test_main():
test_support.requires('network')
test_support.run_unittest(CreationTestCase, TimeoutTestCase)
if __name__ == "__main__":
test_main()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
-----BEGIN RSA PRIVATE KEY-----
MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
Just bad cert data
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
Just bad cert data
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
Bad Key, though the cert should be OK
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
iHkC6gGdBJhogs4=
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
Bad Key, though the cert should be OK
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
iHkC6gGdBJhogs4=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIHPTCCBSWgAwIBAgIBADANBgkqhkiG9w0BAQQFADB5MRAwDgYDVQQKEwdSb290
IENBMR4wHAYDVQQLExVodHRwOi8vd3d3LmNhY2VydC5vcmcxIjAgBgNVBAMTGUNB
IENlcnQgU2lnbmluZyBBdXRob3JpdHkxITAfBgkqhkiG9w0BCQEWEnN1cHBvcnRA
Y2FjZXJ0Lm9yZzAeFw0wMzAzMzAxMjI5NDlaFw0zMzAzMjkxMjI5NDlaMHkxEDAO
BgNVBAoTB1Jvb3QgQ0ExHjAcBgNVBAsTFWh0dHA6Ly93d3cuY2FjZXJ0Lm9yZzEi
MCAGA1UEAxMZQ0EgQ2VydCBTaWduaW5nIEF1dGhvcml0eTEhMB8GCSqGSIb3DQEJ
ARYSc3VwcG9ydEBjYWNlcnQub3JnMIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIIC
CgKCAgEAziLA4kZ97DYoB1CW8qAzQIxL8TtmPzHlawI229Z89vGIj053NgVBlfkJ
8BLPRoZzYLdufujAWGSuzbCtRRcMY/pnCujW0r8+55jE8Ez64AO7NV1sId6eINm6
zWYyN3L69wj1x81YyY7nDl7qPv4coRQKFWyGhFtkZip6qUtTefWIonvuLwphK42y
fk1WpRPs6tqSnqxEQR5YYGUFZvjARL3LlPdCfgv3ZWiYUQXw8wWRBB0bF4LsyFe7
w2t6iPGwcswlWyCR7BYCEo8y6RcYSNDHBS4CMEK4JZwFaz+qOqfrU0j36NK2B5jc
G8Y0f3/JHIJ6BVgrCFvzOKKrF11myZjXnhCLotLddJr3cQxyYN/Nb5gznZY0dj4k
epKwDpUeb+agRThHqtdB7Uq3EvbXG4OKDy7YCbZZ16oE/9KTfWgu3YtLq1i6L43q
laegw1SJpfvbi1EinbLDvhG+LJGGi5Z4rSDTii8aP8bQUWWHIbEZAWV/RRyH9XzQ
QUxPKZgh/TMfdQwEUfoZd9vUFBzugcMd9Zi3aQaRIt0AUMyBMawSB3s42mhb5ivU
fslfrejrckzzAeVLIL+aplfKkQABi6F1ITe1Yw1nPkZPcCBnzsXWWdsC4PDSy826
YreQQejdIOQpvGQpQsgi3Hia/0PsmBsJUUtaWsJx8cTLc6nloQsCAwEAAaOCAc4w
ggHKMB0GA1UdDgQWBBQWtTIb1Mfz4OaO873SsDrusjkY0TCBowYDVR0jBIGbMIGY
gBQWtTIb1Mfz4OaO873SsDrusjkY0aF9pHsweTEQMA4GA1UEChMHUm9vdCBDQTEe
MBwGA1UECxMVaHR0cDovL3d3dy5jYWNlcnQub3JnMSIwIAYDVQQDExlDQSBDZXJ0
IFNpZ25pbmcgQXV0aG9yaXR5MSEwHwYJKoZIhvcNAQkBFhJzdXBwb3J0QGNhY2Vy
dC5vcmeCAQAwDwYDVR0TAQH/BAUwAwEB/zAyBgNVHR8EKzApMCegJaAjhiFodHRw
czovL3d3dy5jYWNlcnQub3JnL3Jldm9rZS5jcmwwMAYJYIZIAYb4QgEEBCMWIWh0
dHBzOi8vd3d3LmNhY2VydC5vcmcvcmV2b2tlLmNybDA0BglghkgBhvhCAQgEJxYl
aHR0cDovL3d3dy5jYWNlcnQub3JnL2luZGV4LnBocD9pZD0xMDBWBglghkgBhvhC
AQ0ESRZHVG8gZ2V0IHlvdXIgb3duIGNlcnRpZmljYXRlIGZvciBGUkVFIGhlYWQg
b3ZlciB0byBodHRwOi8vd3d3LmNhY2VydC5vcmcwDQYJKoZIhvcNAQEEBQADggIB
ACjH7pyCArpcgBLKNQodgW+JapnM8mgPf6fhjViVPr3yBsOQWqy1YPaZQwGjiHCc
nWKdpIevZ1gNMDY75q1I08t0AoZxPuIrA2jxNGJARjtT6ij0rPtmlVOKTV39O9lg
18p5aTuxZZKmxoGCXJzN600BiqXfEVWqFcofN8CCmHBh22p8lqOOLlQ+TyGpkO/c
gr/c6EWtTZBzCDyUZbAEmXZ/4rzCahWqlwQ3JNgelE5tDlG+1sSPypZt90Pf6DBl
Jzt7u0NDY8RD97LsaMzhGY4i+5jhe1o+ATc7iwiwovOVThrLm82asduycPAtStvY
sONvRUgzEv/+PDIqVPfE94rwiCPCR/5kenHA0R6mY7AHfqQv0wGP3J8rtsYIqQ+T
SCX8Ev2fQtzzxD72V7DX3WnRBnc0CkvSyqD/HMaMyRa+xMwyN2hzXwj7UfdJUzYF
CpUCTPJ5GhD22Dp1nPMd8aINcGeGG7MW9S/lpOt5hvk9C8JzC6WZrG/8Z7jlLwum
GCSNe9FINSkYQKyTYOGWhlC0elnYjyELn8+CkcY7v2vcB5G5l1YjqrZslMZIBjzk
zk6q5PYvCdxTby78dOs6Y5nCpqyJvKeyRKANihDjbPIky/qbn3BHLt4Ui9SyIAmW
omTxJBzcoTWcFbLUvFUufQb1nA5V9FrWk9p2rSVzTMVD
-----END CERTIFICATE-----
-----BEGIN RSA PRIVATE KEY-----
MIICXwIBAAKBgQC8ddrhm+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9L
opdJhTvbGfEj0DQs1IE8M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVH
fhi/VwovESJlaBOp+WMnfhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQAB
AoGBAK0FZpaKj6WnJZN0RqhhK+ggtBWwBnc0U/ozgKz2j1s3fsShYeiGtW6CK5nU
D1dZ5wzhbGThI7LiOXDvRucc9n7vUgi0alqPQ/PFodPxAN/eEYkmXQ7W2k7zwsDA
IUK0KUhktQbLu8qF/m8qM86ba9y9/9YkXuQbZ3COl5ahTZrhAkEA301P08RKv3KM
oXnGU2UHTuJ1MAD2hOrPxjD4/wxA/39EWG9bZczbJyggB4RHu0I3NOSFjAm3HQm0
ANOu5QK9owJBANgOeLfNNcF4pp+UikRFqxk5hULqRAWzVxVrWe85FlPm0VVmHbb/
loif7mqjU8o1jTd/LM7RD9f2usZyE2psaw8CQQCNLhkpX3KO5kKJmS9N7JMZSc4j
oog58yeYO8BBqKKzpug0LXuQultYv2K4veaIO04iL9VLe5z9S/Q1jaCHBBuXAkEA
z8gjGoi1AOp6PBBLZNsncCvcV/0aC+1se4HxTNo2+duKSDnbq+ljqOM+E7odU+Nq
ewvIWOG//e8fssd0mq3HywJBAJ8l/c8GVmrpFTx8r/nZ2Pyyjt3dH1widooDXYSV
q6Gbf41Llo5sYAtmxdndTLASuHKecacTgZVhy0FryZpLKrU=
-----END RSA PRIVATE KEY-----
-----BEGIN CERTIFICATE-----
MIICpzCCAhCgAwIBAgIJAP+qStv1cIGNMA0GCSqGSIb3DQEBBQUAMIGJMQswCQYD
VQQGEwJVUzERMA8GA1UECBMIRGVsYXdhcmUxEzARBgNVBAcTCldpbG1pbmd0b24x
IzAhBgNVBAoTGlB5dGhvbiBTb2Z0d2FyZSBGb3VuZGF0aW9uMQwwCgYDVQQLEwNT
U0wxHzAdBgNVBAMTFnNvbWVtYWNoaW5lLnB5dGhvbi5vcmcwHhcNMDcwODI3MTY1
NDUwWhcNMTMwMjE2MTY1NDUwWjCBiTELMAkGA1UEBhMCVVMxETAPBgNVBAgTCERl
bGF3YXJlMRMwEQYDVQQHEwpXaWxtaW5ndG9uMSMwIQYDVQQKExpQeXRob24gU29m
dHdhcmUgRm91bmRhdGlvbjEMMAoGA1UECxMDU1NMMR8wHQYDVQQDExZzb21lbWFj
aGluZS5weXRob24ub3JnMIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQC8ddrh
m+LutBvjYcQlnH21PPIseJ1JVG2HMmN2CmZk2YukO+9LopdJhTvbGfEj0DQs1IE8
M+kTUyOmuKfVrFMKwtVeCJphrAnhoz7TYOuLBSqt7lVHfhi/VwovESJlaBOp+WMn
fhcduPEYHYx/6cnVapIkZnLt30zu2um+DzA9jQIDAQABoxUwEzARBglghkgBhvhC
AQEEBAMCBkAwDQYJKoZIhvcNAQEFBQADgYEAF4Q5BVqmCOLv1n8je/Jw9K669VXb
08hyGzQhkemEBYQd6fzQ9A/1ZzHkJKb1P6yreOLSEh4KcxYPyrLRC1ll8nr5OlCx
CMhKkTnR6qBsdNV0XtdU2+N25hqW+Ma4ZeqsN/iiJVCGNOZGnvQuvCAGWF8+J/f/
iHkC6gGdBJhogs4=
-----END CERTIFICATE-----
This diff is collapsed.
-----BEGIN CERTIFICATE-----
MIIFxzCCA6+gAwIBAgIJALnlnf5uzTkIMA0GCSqGSIb3DQEBCwUAMEsxCzAJBgNV
BAYTAkRFMRcwFQYDVQQKEw5zY2hva29rZWtzLm9yZzEjMCEGCSqGSIb3DQEJARYU
aGFubm9Ac2Nob2tva2Vrcy5vcmcwHhcNMTAwMTI3MDAyMTI1WhcNMjAwMTI1MDAy
MTI1WjBLMQswCQYDVQQGEwJERTEXMBUGA1UEChMOc2Nob2tva2Vrcy5vcmcxIzAh
BgkqhkiG9w0BCQEWFGhhbm5vQHNjaG9rb2tla3Mub3JnMIICIjANBgkqhkiG9w0B
AQEFAAOCAg8AMIICCgKCAgEApJ4ODPwEooMW35dQPlBqdvcfkEvjhcsA7jmJfFqN
e/1T34zT44X9+KnMBSG2InacbD7eyFgjfaENFsZ87YkEBDIFZ/SHotLJZORQ8PUj
YoxPG4mjKN+yL2WthNcYbRyJreTbbDroNMuw6tkTSxeSXyYFQrKMCUfErVbZa/d5
RvfFVk+Au9dVUFhed/Stn5cv+a0ffvpyA7ygihm1kMFICbvPeI0846tmC2Ph7rM5
pYQyNBDOVpULODTk5Wu6jiiJJygvJWCZ1FdpsdBs5aKWHWdRhX++quGuflTTjH5d
qaIka4op9H7XksYphTDXmV+qHnva5jbPogwutDQcVsGBQcJaLmQqhsQK13bf4khE
iWJvfBLfHn8OOpY25ZwwuigJIwifNCxQeeT1FrLmyuYNhz2phPpzx065kqSUSR+A
Iw8DPE6e65UqMDKqZnID3dQeiQaFrHEV+Ibo0U/tD0YSBw5p33TMh0Es33IBWMac
m7x4hIFWdhl8W522u6qOrTswY3s8vB7blNWqMc9n7oWH8ybFf7EgKeDVtEN9AyBE
0WotXIEZWI+WvDbU1ACJXau9sQhYP/eerg7Zwr3iGUy4IQ5oUJibnjtcE+z8zmDN
pE6YcMCLJyLjXiQ3iHG9mNXzw7wPnslTbEEEukrfSlHGgW8Dm+VrNyW0JUM1bntx
vbMCAwEAAaOBrTCBqjAdBgNVHQ4EFgQUCedv7pDTuXtCxm4HTw9hUtrTvsowewYD
VR0jBHQwcoAUCedv7pDTuXtCxm4HTw9hUtrTvsqhT6RNMEsxCzAJBgNVBAYTAkRF
MRcwFQYDVQQKEw5zY2hva29rZWtzLm9yZzEjMCEGCSqGSIb3DQEJARYUaGFubm9A
c2Nob2tva2Vrcy5vcmeCCQC55Z3+bs05CDAMBgNVHRMEBTADAQH/MA0GCSqGSIb3
DQEBCwUAA4ICAQBHKAxA7WA/MEFjet03K8ouzEOr6Jrk2fZOuRhoDZ+9gr4FtaJB
P3Hh5D00kuSOvDnwsvCohxeNd1KTMAwVmVoH+NZkHERn3UXniUENlp18koI1ehlr
CZbXbzzE9Te9BelliSFA63q0cq0yJN1x9GyabU34XkAouCAmOqfSpKNZWZHGBHPF
bbYnZrHEMcsye6vKeTOcg1GqUHGrQM2WK0QaOwnCQv2RblI9VN+SeRoUJ44qTXdW
TwIYStsIPesacNcAQTStnHgKqIPx4zCwdx5xo8zONbXJfocqwyFqiAofvb9dN1nW
g1noVBcXB+oRBZW5CjFw87U88itq39i9+BWl835DWLBW2pVmx1QTLGv0RNgs/xVx
mWnjH4nNHvrjn6pRmqHZTk/SS0Hkl2qtDsynVxIl8EiMTfWSU3DBTuD2J/RSzuOE
eKtAbaoXkXE31jCl4FEZLITIZd8UkXacb9rN304tAK92L76JOAV+xOZxFRipmvx4
+A9qQXgLhtP4VaDajb44V/kCKPSA0Vm3apehke9Wl8dDtagfos1e6MxSu3EVLXRF
SP2U777V77pdMSd0f/7cerKn5FjrxW1v1FaP1oIGniMk4qQNTgA/jvvhjybsPlVA
jsfnhWGbh1voJa0RQcMiRMsxpw2P1KNOEu37W2eq/vFghVztZJQUmb5iNw==
-----END CERTIFICATE-----
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
from test import test_support
import unittest
import select
import os
import sys
class SelectTestCase(unittest.TestCase):
class Nope:
pass
class Almost:
def fileno(self):
return 'fileno'
def test_error_conditions(self):
self.assertRaises(TypeError, select.select, 1, 2, 3)
self.assertRaises(TypeError, select.select, [self.Nope()], [], [])
self.assertRaises(TypeError, select.select, [self.Almost()], [], [])
self.assertRaises(TypeError, select.select, [], [], [], "not a number")
def test_returned_list_identity(self):
if sys.platform[:3] in ('win', 'mac', 'os2'):
if test_support.verbose:
print "can't easily test on this system"
return
# See issue #8329
r, w, x = select.select([], [], [], 1)
self.assertFalse(r is w)
self.assertFalse(r is x)
self.assertFalse(w is x)
def test_select(self):
if sys.platform[:3] in ('win', 'mac', 'os2'):
if test_support.verbose:
print "can't easily test on this system"
return
cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
p = os.popen(cmd, 'r')
for tout in (0, 1, 2, 4, 8, 16) + (None,)*10:
if test_support.verbose:
print 'timeout =', tout
rfd, wfd, xfd = select.select([p], [], [], tout)
if (rfd, wfd, xfd) == ([], [], []):
continue
if (rfd, wfd, xfd) == ([p], [], []):
line = p.readline()
if test_support.verbose:
print repr(line)
if not line:
if test_support.verbose:
print 'EOF'
break
continue
self.fail('Unexpected return values from select():', rfd, wfd, xfd)
p.close()
def test_main():
test_support.run_unittest(SelectTestCase)
test_support.reap_children()
if __name__ == "__main__":
test_main()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import socket
import threading
import telnetlib
import time
from unittest import TestCase
from test import test_support
HOST = test_support.HOST
def server(evt, serv):
serv.listen(5)
evt.set()
try:
conn, addr = serv.accept()
except socket.timeout:
pass
finally:
serv.close()
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.settimeout(3)
self.port = test_support.bind_port(self.sock)
threading.Thread(target=server, args=(self.evt,self.sock)).start()
self.evt.wait()
self.evt.clear()
time.sleep(.1)
def tearDown(self):
self.evt.wait()
def testBasic(self):
# connects
telnet = telnetlib.Telnet(HOST, self.port)
telnet.sock.close()
def testTimeoutDefault(self):
self.assertTrue(socket.getdefaulttimeout() is None)
socket.setdefaulttimeout(30)
try:
telnet = telnetlib.Telnet("localhost", self.port)
finally:
socket.setdefaulttimeout(None)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testTimeoutNone(self):
# None, having other default
self.assertTrue(socket.getdefaulttimeout() is None)
socket.setdefaulttimeout(30)
try:
telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(None)
self.assertTrue(telnet.sock.gettimeout() is None)
telnet.sock.close()
def testTimeoutValue(self):
telnet = telnetlib.Telnet("localhost", self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testTimeoutOpen(self):
telnet = telnetlib.Telnet()
telnet.open("localhost", self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def test_main(verbose=None):
test_support.run_unittest(GeneralTests)
if __name__ == '__main__':
test_main()
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
import sys
from greentest import walk_modules, BaseTestCase, main
import six
......
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment