Commit 88d36895 authored by Jason Madden's avatar Jason Madden

Update 3.6pypy tests. All pass on OSX with pypy3.6-7.3

parent b09e74ed
......@@ -854,6 +854,10 @@ if PYPY and PY3:
'test_subprocess.POSIXProcessTestCase.test_pass_fds_inheritable',
'test_subprocess.POSIXProcessTestCase.test_pipe_cloexec',
# This passes various "invalid" strings and expects a ValueError. not sure why
# we don't see errors on Linux.
'test_subprocess.ProcessTestCase.test_invalid_env',
# The below are new with 5.10.1
# These fail with 'OSError: received malformed or improperly truncated ancillary data'
'test_socket.RecvmsgSCMRightsStreamTest.testCmsgTruncLen0',
......@@ -872,12 +876,16 @@ if PYPY and PY3:
'test_ssl.ThreadedTests.test_protocol_sslv3',
'test_ssl.ThreadedTests.test_protocol_tlsv1',
'test_ssl.ThreadedTests.test_protocol_tlsv1_1',
# Similar, they fail without monkey-patching.
'test_ssl.TestPostHandshakeAuth.test_pha_no_pha_client',
'test_ssl.TestPostHandshakeAuth.test_pha_optional',
'test_ssl.TestPostHandshakeAuth.test_pha_required',
# This gets None instead of http1.1, even without gevent
'test_ssl.ThreadedTests.test_npn_protocols',
# This fails to decode a filename even without gevent,
# at least on High Sierarr.
# at least on High Sierra. Newer versions of the tests actually skip this.
'test_httpservers.SimpleHTTPServerTestCase.test_undecodable_filename',
]
......
DH Parameters: (3072 bit)
prime:
00:ff:ff:ff:ff:ff:ff:ff:ff:ad:f8:54:58:a2:bb:
4a:9a:af:dc:56:20:27:3d:3c:f1:d8:b9:c5:83:ce:
2d:36:95:a9:e1:36:41:14:64:33:fb:cc:93:9d:ce:
24:9b:3e:f9:7d:2f:e3:63:63:0c:75:d8:f6:81:b2:
02:ae:c4:61:7a:d3:df:1e:d5:d5:fd:65:61:24:33:
f5:1f:5f:06:6e:d0:85:63:65:55:3d:ed:1a:f3:b5:
57:13:5e:7f:57:c9:35:98:4f:0c:70:e0:e6:8b:77:
e2:a6:89:da:f3:ef:e8:72:1d:f1:58:a1:36:ad:e7:
35:30:ac:ca:4f:48:3a:79:7a:bc:0a:b1:82:b3:24:
fb:61:d1:08:a9:4b:b2:c8:e3:fb:b9:6a:da:b7:60:
d7:f4:68:1d:4f:42:a3:de:39:4d:f4:ae:56:ed:e7:
63:72:bb:19:0b:07:a7:c8:ee:0a:6d:70:9e:02:fc:
e1:cd:f7:e2:ec:c0:34:04:cd:28:34:2f:61:91:72:
fe:9c:e9:85:83:ff:8e:4f:12:32:ee:f2:81:83:c3:
fe:3b:1b:4c:6f:ad:73:3b:b5:fc:bc:2e:c2:20:05:
c5:8e:f1:83:7d:16:83:b2:c6:f3:4a:26:c1:b2:ef:
fa:88:6b:42:38:61:1f:cf:dc:de:35:5b:3b:65:19:
03:5b:bc:34:f4:de:f9:9c:02:38:61:b4:6f:c9:d6:
e6:c9:07:7a:d9:1d:26:91:f7:f7:ee:59:8c:b0:fa:
c1:86:d9:1c:ae:fe:13:09:85:13:92:70:b4:13:0c:
93:bc:43:79:44:f4:fd:44:52:e2:d7:4d:d3:64:f2:
e2:1e:71:f5:4b:ff:5c:ae:82:ab:9c:9d:f6:9e:e8:
6d:2b:c5:22:36:3a:0d:ab:c5:21:97:9b:0d:ea:da:
1d:bf:9a:42:d5:c4:48:4e:0a:bc:d0:6b:fa:53:dd:
ef:3c:1b:20:ee:3f:d5:9d:7c:25:e4:1d:2b:66:c6:
2e:37:ff:ff:ff:ff:ff:ff:ff:ff
generator: 2 (0x2)
recommended-private-length: 276 bits
-----BEGIN DH PARAMETERS-----
MIIBjAKCAYEA//////////+t+FRYortKmq/cViAnPTzx2LnFg84tNpWp4TZBFGQz
+8yTnc4kmz75fS/jY2MMddj2gbICrsRhetPfHtXV/WVhJDP1H18GbtCFY2VVPe0a
87VXE15/V8k1mE8McODmi3fipona8+/och3xWKE2rec1MKzKT0g6eXq8CrGCsyT7
YdEIqUuyyOP7uWrat2DX9GgdT0Kj3jlN9K5W7edjcrsZCwenyO4KbXCeAvzhzffi
7MA0BM0oNC9hkXL+nOmFg/+OTxIy7vKBg8P+OxtMb61zO7X8vC7CIAXFjvGDfRaD
ssbzSibBsu/6iGtCOGEfz9zeNVs7ZRkDW7w09N75nAI4YbRvydbmyQd62R0mkff3
7lmMsPrBhtkcrv4TCYUTknC0EwyTvEN5RPT9RFLi103TZPLiHnH1S/9croKrnJ32
nuhtK8UiNjoNq8Uhl5sN6todv5pC1cRITgq80Gv6U93vPBsg7j/VnXwl5B0rZsYu
N///////////AgECAgIBFA==
-----END DH PARAMETERS-----
......@@ -32,6 +32,9 @@ class Bunch(object):
self.started = []
self.finished = []
self._can_exit = not wait_before_exit
self.wait_thread = support.wait_threads_exit()
self.wait_thread.__enter__()
def task():
tid = threading.get_ident()
self.started.append(tid)
......@@ -41,6 +44,7 @@ class Bunch(object):
self.finished.append(tid)
while not self._can_exit:
_wait()
try:
for i in range(n):
start_new_thread(task, ())
......@@ -55,6 +59,8 @@ class Bunch(object):
def wait_for_finished(self):
while len(self.finished) < self.n:
_wait()
# Wait for threads exit
self.wait_thread.__exit__(None, None, None)
def do_finish(self):
self._can_exit = True
......@@ -222,20 +228,23 @@ class LockTests(BaseLockTests):
# Lock needs to be released before re-acquiring.
lock = self.locktype()
phase = []
def f():
lock.acquire()
phase.append(None)
lock.acquire()
phase.append(None)
start_new_thread(f, ())
while len(phase) == 0:
_wait()
_wait()
self.assertEqual(len(phase), 1)
lock.release()
while len(phase) == 1:
with support.wait_threads_exit():
start_new_thread(f, ())
while len(phase) == 0:
_wait()
_wait()
self.assertEqual(len(phase), 2)
self.assertEqual(len(phase), 1)
lock.release()
while len(phase) == 1:
_wait()
self.assertEqual(len(phase), 2)
def test_different_thread(self):
# Lock can be released from a different thread.
......@@ -306,6 +315,7 @@ class RLockTests(BaseLockTests):
self.assertRaises(RuntimeError, lock.release)
finally:
b.do_finish()
b.wait_for_finished()
def test__is_owned(self):
lock = self.locktype()
......@@ -397,12 +407,13 @@ class EventTests(BaseTestCase):
# cleared before the waiting thread is woken up.
evt = self.eventtype()
results = []
timeout = 0.250
N = 5
def f():
results.append(evt.wait(1))
results.append(evt.wait(timeout * 4))
b = Bunch(f, N)
b.wait_for_started()
time.sleep(0.5)
time.sleep(timeout)
evt.set()
evt.clear()
b.wait_for_finished()
......@@ -463,21 +474,28 @@ class ConditionTests(BaseTestCase):
# construct. In particular, it is possible that this can no longer
# be conveniently guaranteed should their implementation ever change.
N = 5
ready = []
results1 = []
results2 = []
phase_num = 0
def f():
cond.acquire()
ready.append(phase_num)
result = cond.wait()
cond.release()
results1.append((result, phase_num))
cond.acquire()
ready.append(phase_num)
result = cond.wait()
cond.release()
results2.append((result, phase_num))
b = Bunch(f, N)
b.wait_for_started()
_wait()
# first wait, to ensure all workers settle into cond.wait() before
# we continue. See issues #8799 and #30727.
while len(ready) < 5:
_wait()
ready.clear()
self.assertEqual(results1, [])
# Notify 3 threads at first
cond.acquire()
......@@ -489,9 +507,9 @@ class ConditionTests(BaseTestCase):
_wait()
self.assertEqual(results1, [(True, 1)] * 3)
self.assertEqual(results2, [])
# first wait, to ensure all workers settle into cond.wait() before
# we continue. See issue #8799
_wait()
# make sure all awaken workers settle into cond.wait()
while len(ready) < 3:
_wait()
# Notify 5 threads: they might be in their first or second wait
cond.acquire()
cond.notify(5)
......@@ -502,7 +520,9 @@ class ConditionTests(BaseTestCase):
_wait()
self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
self.assertEqual(results2, [(True, 2)] * 3)
_wait() # make sure all workers settle into cond.wait()
# make sure all workers settle into cond.wait()
while len(ready) < 5:
_wait()
# Notify all threads: they are all in their second wait
cond.acquire()
cond.notify_all()
......@@ -612,13 +632,14 @@ class BaseSemaphoreTests(BaseTestCase):
sem = self.semtype(7)
sem.acquire()
N = 10
sem_results = []
results1 = []
results2 = []
phase_num = 0
def f():
sem.acquire()
sem_results.append(sem.acquire())
results1.append(phase_num)
sem.acquire()
sem_results.append(sem.acquire())
results2.append(phase_num)
b = Bunch(f, 10)
b.wait_for_started()
......@@ -642,6 +663,7 @@ class BaseSemaphoreTests(BaseTestCase):
# Final release, to let the last thread finish
sem.release()
b.wait_for_finished()
self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
def test_try_acquire(self):
sem = self.semtype(2)
......
-----BEGIN X509 CRL-----
MIIBpjCBjwIBATANBgkqhkiG9w0BAQUFADBNMQswCQYDVQQGEwJYWTEmMCQGA1UE
MIICJjCBjwIBATANBgkqhkiG9w0BAQsFADBNMQswCQYDVQQGEwJYWTEmMCQGA1UE
CgwdUHl0aG9uIFNvZnR3YXJlIEZvdW5kYXRpb24gQ0ExFjAUBgNVBAMMDW91ci1j
YS1zZXJ2ZXIXDTEzMTEyMTE3MDg0N1oXDTIzMDkzMDE3MDg0N1qgDjAMMAoGA1Ud
FAQDAgEAMA0GCSqGSIb3DQEBBQUAA4IBAQCNJXC2mVKauEeN3LlQ3ZtM5gkH3ExH
+i4bmJjtJn497WwvvoIeUdrmVXgJQR93RtV37hZwN0SXMLlNmUZPH4rHhihayw4m
unCzVj/OhCCY7/TPjKuJ1O/0XhaLBpBVjQN7R/1ujoRKbSia/CD3vcn7Fqxzw7LK
fSRCKRGTj1CZiuxrphtFchwALXSiFDy9mr2ZKhImcyq1PydfgEzU78APpOkMQsIC
UNJ/cf3c9emzf+dUtcMEcejQ3mynBo4eIGg1EW42bz4q4hSjzQlKcBV0muw5qXhc
HOxH2iTFhQ7SrvVuK/dM14rYM4B5mSX3nRC1kNmXpS9j3wJDhuwmjHed
YS1zZXJ2ZXIXDTE4MDgyOTE0MjMxNloXDTI4MDcwNzE0MjMxNlqgDjAMMAoGA1Ud
FAQDAgEAMA0GCSqGSIb3DQEBCwUAA4IBgQCPhrtGSbuvxPAI3YWQFDB4iOWdBnVk
ugW1lsifmCsE86FfID0EwUut1SRHlksltMtcoULMEIdu8yMLWci++4ve22EEuMKT
HUc3T/wBIuQUhA7U4deFG8CZPAxRpNoK470y7dkD4OVf0Gxa6WYDl9z8mXKmWCB9
hvzqVfLWNSLTAVPsHtkD5PXdi5yRkQr6wYD7poWaIvkpsn7EKCY6Tw5V3rsbRuZq
AGVCq5TH3mctcmwLloCJ4Xr/1q0DsRrYxeeLYxE+UpvvCbVBKgtjBK7zINS7AbcJ
CYCYKUwGWv1fYKJ+KQQHf75mT3jQ9lWuzOj/YWK4k1EBnYmVGuKKt73lLFxC6h3y
MUnaBZc1KZSyJj0IxfHg/o6qx8NgKOl9XRIQ5g5B30cwpPOskGhEhodbTTY3bPtm
RQ36JvQZngzmkhyhr+MDEV5yUTOShfUiclzQOx26CmLmLHWxOZgXtFZob/oKrvbm
Gen/+7K7YTw6hfY52U7J2FuQRGOyzBXfBYQ=
-----END X509 CRL-----
......@@ -21,25 +21,19 @@ class InterProcessSignalTests(unittest.TestCase):
self.got_signals['SIGUSR1'] += 1
raise SIGUSR1Exception
def wait_signal(self, child, signame, exc_class=None):
try:
if child is not None:
# This wait should be interrupted by exc_class
# (if set)
child.wait()
timeout = 10.0
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if self.got_signals[signame]:
return
signal.pause()
except BaseException as exc:
if exc_class is not None and isinstance(exc, exc_class):
# got the expected exception
def wait_signal(self, child, signame):
if child is not None:
# This wait should be interrupted by exc_class
# (if set)
child.wait()
timeout = 10.0
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if self.got_signals[signame]:
return
raise
signal.pause()
self.fail('signal %s not received after %s seconds'
% (signame, timeout))
......@@ -65,8 +59,9 @@ class InterProcessSignalTests(unittest.TestCase):
self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 0,
'SIGALRM': 0})
with self.subprocess_send_signal(pid, "SIGUSR1") as child:
self.wait_signal(child, 'SIGUSR1', SIGUSR1Exception)
with self.assertRaises(SIGUSR1Exception):
with self.subprocess_send_signal(pid, "SIGUSR1") as child:
self.wait_signal(child, 'SIGUSR1')
self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
'SIGALRM': 0})
......@@ -74,10 +69,14 @@ class InterProcessSignalTests(unittest.TestCase):
# Nothing should happen: SIGUSR2 is ignored
child.wait()
signal.alarm(1)
self.wait_signal(None, 'SIGALRM', KeyboardInterrupt)
self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
'SIGALRM': 0})
try:
with self.assertRaises(KeyboardInterrupt):
signal.alarm(1)
self.wait_signal(None, 'SIGALRM')
self.assertEqual(self.got_signals, {'SIGHUP': 1, 'SIGUSR1': 1,
'SIGALRM': 0})
finally:
signal.alarm(0)
if __name__ == "__main__":
......
-----BEGIN CERTIFICATE-----
MIIDqDCCApKgAwIBAgIBAjALBgkqhkiG9w0BAQswHzELMAkGA1UEBhMCVUsxEDAO
BgNVBAMTB2NvZHktY2EwHhcNMTgwNjE4MTgwMDU4WhcNMjgwNjE0MTgwMDU4WjA7
MQswCQYDVQQGEwJVSzEsMCoGA1UEAxMjY29kZW5vbWljb24tdm0tMi50ZXN0Lmxh
bC5jaXNjby5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQC63fGB
J80A9Av1GB0bptslKRIUtJm8EeEu34HkDWbL6AJY0P8WfDtlXjlPaLqFa6sqH6ES
V48prSm1ZUbDSVL8R6BYVYpOlK8/48xk4pGTgRzv69gf5SGtQLwHy8UPBKgjSZoD
5a5k5wJXGswhKFFNqyyxqCvWmMnJWxXTt2XDCiWc4g4YAWi4O4+6SeeHVAV9rV7C
1wxqjzKovVe2uZOHjKEzJbbIU6JBPb6TRfMdRdYOw98n1VXDcKVgdX2DuuqjCzHP
WhU4Tw050M9NaK3eXp4Mh69VuiKoBGOLSOcS8reqHIU46Reg0hqeL8LIL6OhFHIF
j7HR6V1X6F+BfRS/AgMBAAGjgdYwgdMwCQYDVR0TBAIwADAdBgNVHQ4EFgQUOktp
HQjxDXXUg8prleY9jeLKeQ4wTwYDVR0jBEgwRoAUx6zgPygZ0ZErF9sPC4+5e2Io
UU+hI6QhMB8xCzAJBgNVBAYTAlVLMRAwDgYDVQQDEwdjb2R5LWNhggkA1QEAuwb7
2s0wCQYDVR0SBAIwADAuBgNVHREEJzAlgiNjb2Rlbm9taWNvbi12bS0yLnRlc3Qu
bGFsLmNpc2NvLmNvbTAOBgNVHQ8BAf8EBAMCBaAwCwYDVR0fBAQwAjAAMAsGCSqG
SIb3DQEBCwOCAQEAvqantx2yBlM11RoFiCfi+AfSblXPdrIrHvccepV4pYc/yO6p
t1f2dxHQb8rWH3i6cWag/EgIZx+HJQvo0rgPY1BFJsX1WnYf1/znZpkUBGbVmlJr
t/dW1gSkNS6sPsM0Q+7HPgEv8CPDNK5eo7vU2seE0iWOkxSyVUuiCEY9ZVGaLVit
p0C78nZ35Pdv4I+1cosmHl28+es1WI22rrnmdBpH8J1eY6WvUw2xuZHLeNVN0TzV
Q3qq53AaCWuLOD1AjESWuUCxMZTK9DPS4JKXTK8RLyDeqOvJGjsSWp3kL0y3GaQ+
10T1rfkKJub2+m9A9duin1fn6tHc2wSvB7m3DA==
-----END CERTIFICATE-----
......@@ -433,7 +433,10 @@ class FileWrapperTest(unittest.TestCase):
f = asyncore.file_wrapper(fd)
os.close(fd)
f.close()
os.close(f.fd) # file_wrapper dupped fd
with self.assertRaises(OSError):
f.close()
self.assertEqual(f.fd, -1)
# calling close twice should not fail
f.close()
......@@ -502,7 +505,7 @@ class BaseClient(BaseTestHandler):
class BaseTestAPI:
def tearDown(self):
asyncore.close_all()
asyncore.close_all(ignore_all=True)
def loop_waiting_for_flag(self, instance, timeout=5):
timeout = float(timeout) / 100
......@@ -755,50 +758,50 @@ class BaseTestAPI:
def test_set_reuse_addr(self):
if HAS_UNIX_SOCKETS and self.family == socket.AF_UNIX:
self.skipTest("Not applicable to AF_UNIX sockets.")
sock = socket.socket(self.family)
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except OSError:
unittest.skip("SO_REUSEADDR not supported on this platform")
else:
# if SO_REUSEADDR succeeded for sock we expect asyncore
# to do the same
s = asyncore.dispatcher(socket.socket(self.family))
self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR))
s.socket.close()
s.create_socket(self.family)
s.set_reuse_addr()
self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR))
finally:
sock.close()
with socket.socket(self.family) as sock:
try:
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except OSError:
unittest.skip("SO_REUSEADDR not supported on this platform")
else:
# if SO_REUSEADDR succeeded for sock we expect asyncore
# to do the same
s = asyncore.dispatcher(socket.socket(self.family))
self.assertFalse(s.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR))
s.socket.close()
s.create_socket(self.family)
s.set_reuse_addr()
self.assertTrue(s.socket.getsockopt(socket.SOL_SOCKET,
socket.SO_REUSEADDR))
@unittest.skipUnless(threading, 'Threading required for this test.')
@support.reap_threads
def test_quick_connect(self):
# see: http://bugs.python.org/issue10340
if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
server = BaseServer(self.family, self.addr)
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
count=500))
t.start()
def cleanup():
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
self.addCleanup(cleanup)
s = socket.socket(self.family, socket.SOCK_STREAM)
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except OSError:
pass
finally:
s.close()
if self.family not in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
self.skipTest("test specific to AF_INET and AF_INET6")
server = BaseServer(self.family, self.addr)
# run the thread 500 ms: the socket should be connected in 200 ms
t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
count=5))
t.start()
try:
with socket.socket(self.family, socket.SOCK_STREAM) as s:
s.settimeout(.2)
s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
struct.pack('ii', 1, 0))
try:
s.connect(server.address)
except OSError:
pass
finally:
t.join(timeout=TIMEOUT)
if t.is_alive():
self.fail("join() timed out")
class TestAPI_UseIPv4Sockets(BaseTestAPI):
family = socket.AF_INET
......
......@@ -52,6 +52,7 @@ class TestServerThread(threading.Thread):
def stop(self):
self.server.shutdown()
self.join()
class BaseTestCase(unittest.TestCase):
......@@ -371,7 +372,8 @@ class SimpleHTTPServerTestCase(BaseTestCase):
reader.close()
return body
@support.requires_mac_ver(10, 5)
@unittest.skipIf(sys.platform == 'darwin',
'undecodable name cannot always be decoded on macOS')
@unittest.skipIf(sys.platform == 'win32',
'undecodable name cannot be decoded on win32')
@unittest.skipUnless(support.TESTFN_UNDECODABLE,
......
......@@ -46,28 +46,27 @@ class _TriggerThread(threading.Thread):
class BlockingTestMixin:
def tearDown(self):
self.t = None
def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
self.t = _TriggerThread(trigger_func, trigger_args)
self.t.start()
self.result = block_func(*block_args)
# If block_func returned before our thread made the call, we failed!
if not self.t.startedEvent.is_set():
self.fail("blocking function '%r' appeared not to block" %
block_func)
self.t.join(10) # make sure the thread terminates
if self.t.is_alive():
self.fail("trigger function '%r' appeared to not return" %
trigger_func)
return self.result
thread = _TriggerThread(trigger_func, trigger_args)
thread.start()
try:
self.result = block_func(*block_args)
# If block_func returned before our thread made the call, we failed!
if not thread.startedEvent.is_set():
self.fail("blocking function '%r' appeared not to block" %
block_func)
return self.result
finally:
thread.join(10) # make sure the thread terminates
if thread.is_alive():
self.fail("trigger function '%r' appeared to not return" %
trigger_func)
# Call this instead if block_func is supposed to raise an exception.
def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
trigger_args, expected_exception_class):
self.t = _TriggerThread(trigger_func, trigger_args)
self.t.start()
thread = _TriggerThread(trigger_func, trigger_args)
thread.start()
try:
try:
block_func(*block_args)
......@@ -77,11 +76,11 @@ class BlockingTestMixin:
self.fail("expected exception of kind %r" %
expected_exception_class)
finally:
self.t.join(10) # make sure the thread terminates
if self.t.is_alive():
thread.join(10) # make sure the thread terminates
if thread.is_alive():
self.fail("trigger function '%r' appeared to not return" %
trigger_func)
if not self.t.startedEvent.is_set():
if not thread.startedEvent.is_set():
self.fail("trigger thread ended but event never set")
......@@ -159,8 +158,11 @@ class BaseQueueTestMixin(BlockingTestMixin):
def queue_join_test(self, q):
self.cum = 0
threads = []
for i in (0,1):
threading.Thread(target=self.worker, args=(q,)).start()
thread = threading.Thread(target=self.worker, args=(q,))
thread.start()
threads.append(thread)
for i in range(100):
q.put(i)
q.join()
......@@ -169,6 +171,8 @@ class BaseQueueTestMixin(BlockingTestMixin):
for i in (0,1):
q.put(-1) # instruct the threads to close
q.join() # verify that you can join twice
for thread in threads:
thread.join()
def test_queue_task_done(self):
# Test to make sure a queue task completed successfully.
......
......@@ -3,11 +3,13 @@ from test import support
from contextlib import closing
import enum
import gc
import os
import pickle
import random
import select
import signal
import socket
import struct
import statistics
import subprocess
import traceback
import sys, os, time, errno
......@@ -370,7 +372,6 @@ class WakeupSocketSignalTests(unittest.TestCase):
signal.signal(signum, handler)
read, write = socket.socketpair()
read.setblocking(False)
write.setblocking(False)
signal.set_wakeup_fd(write.fileno())
......@@ -615,6 +616,15 @@ class ItimerTest(unittest.TestCase):
# and the handler should have been called
self.assertEqual(self.hndl_called, True)
def test_setitimer_tiny(self):
# bpo-30807: C setitimer() takes a microsecond-resolution interval.
# Check that float -> timeval conversion doesn't round
# the interval down to zero, which would disable the timer.
self.itimer = signal.ITIMER_REAL
signal.setitimer(self.itimer, 1e-6)
time.sleep(1)
self.assertEqual(self.hndl_called, True)
class PendingSignalsTests(unittest.TestCase):
"""
......@@ -950,6 +960,135 @@ class PendingSignalsTests(unittest.TestCase):
(exitcode, stdout))
class StressTest(unittest.TestCase):
"""
Stress signal delivery, especially when a signal arrives in
the middle of recomputing the signal state or executing
previously tripped signal handlers.
"""
def setsig(self, signum, handler):
old_handler = signal.signal(signum, handler)
self.addCleanup(signal.signal, signum, old_handler)
def measure_itimer_resolution(self):
N = 20
times = []
def handler(signum=None, frame=None):
if len(times) < N:
times.append(time.perf_counter())
# 1 µs is the smallest possible timer interval,
# we want to measure what the concrete duration
# will be on this platform
signal.setitimer(signal.ITIMER_REAL, 1e-6)
self.addCleanup(signal.setitimer, signal.ITIMER_REAL, 0)
self.setsig(signal.SIGALRM, handler)
handler()
while len(times) < N:
time.sleep(1e-3)
durations = [times[i+1] - times[i] for i in range(len(times) - 1)]
med = statistics.median(durations)
if support.verbose:
print("detected median itimer() resolution: %.6f s." % (med,))
return med
def decide_itimer_count(self):
# Some systems have poor setitimer() resolution (for example
# measured around 20 ms. on FreeBSD 9), so decide on a reasonable
# number of sequential timers based on that.
reso = self.measure_itimer_resolution()
if reso <= 1e-4:
return 10000
elif reso <= 1e-2:
return 100
else:
self.skipTest("detected itimer resolution (%.3f s.) too high "
"(> 10 ms.) on this platform (or system too busy)"
% (reso,))
@unittest.skipUnless(hasattr(signal, "setitimer"),
"test needs setitimer()")
def test_stress_delivery_dependent(self):
"""
This test uses dependent signal handlers.
"""
N = self.decide_itimer_count()
sigs = []
def first_handler(signum, frame):
# 1e-6 is the minimum non-zero value for `setitimer()`.
# Choose a random delay so as to improve chances of
# triggering a race condition. Ideally the signal is received
# when inside critical signal-handling routines such as
# Py_MakePendingCalls().
signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
def second_handler(signum=None, frame=None):
sigs.append(signum)
# Here on Linux, SIGPROF > SIGALRM > SIGUSR1. By using both
# ascending and descending sequences (SIGUSR1 then SIGALRM,
# SIGPROF then SIGALRM), we maximize chances of hitting a bug.
self.setsig(signal.SIGPROF, first_handler)
self.setsig(signal.SIGUSR1, first_handler)
self.setsig(signal.SIGALRM, second_handler) # for ITIMER_REAL
expected_sigs = 0
deadline = time.time() + 15.0
while expected_sigs < N:
os.kill(os.getpid(), signal.SIGPROF)
expected_sigs += 1
# Wait for handlers to run to avoid signal coalescing
while len(sigs) < expected_sigs and time.time() < deadline:
time.sleep(1e-5)
os.kill(os.getpid(), signal.SIGUSR1)
expected_sigs += 1
while len(sigs) < expected_sigs and time.time() < deadline:
time.sleep(1e-5)
# All ITIMER_REAL signals should have been delivered to the
# Python handler
self.assertEqual(len(sigs), N, "Some signals were lost")
@unittest.skipUnless(hasattr(signal, "setitimer"),
"test needs setitimer()")
def test_stress_delivery_simultaneous(self):
"""
This test uses simultaneous signal handlers.
"""
N = self.decide_itimer_count()
sigs = []
def handler(signum, frame):
sigs.append(signum)
self.setsig(signal.SIGUSR1, handler)
self.setsig(signal.SIGALRM, handler) # for ITIMER_REAL
expected_sigs = 0
deadline = time.time() + 15.0
while expected_sigs < N:
# Hopefully the SIGALRM will be received somewhere during
# initial processing of SIGUSR1.
signal.setitimer(signal.ITIMER_REAL, 1e-6 + random.random() * 1e-5)
os.kill(os.getpid(), signal.SIGUSR1)
expected_sigs += 2
# Wait for handlers to run to avoid signal coalescing
while len(sigs) < expected_sigs and time.time() < deadline:
time.sleep(1e-5)
# All ITIMER_REAL signals should have been delivered to the
# Python handler
self.assertEqual(len(sigs), N, "Some signals were lost")
def tearDownModule():
support.reap_children()
......
......@@ -18,6 +18,11 @@ import textwrap
import unittest
from test import support, mock_socket
from unittest.mock import Mock
HOST = "localhost"
HOSTv4 = "127.0.0.1"
HOSTv6 = "::1"
try:
import threading
......@@ -569,6 +574,33 @@ class NonConnectingTests(unittest.TestCase):
"localhost:bogus")
class DefaultArgumentsTests(unittest.TestCase):
def setUp(self):
self.msg = EmailMessage()
self.msg['From'] = 'Páolo <főo@bar.com>'
self.smtp = smtplib.SMTP()
self.smtp.ehlo = Mock(return_value=(200, 'OK'))
self.smtp.has_extn, self.smtp.sendmail = Mock(), Mock()
def testSendMessage(self):
expected_mail_options = ('SMTPUTF8', 'BODY=8BITMIME')
self.smtp.send_message(self.msg)
self.smtp.send_message(self.msg)
self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
expected_mail_options)
self.assertEqual(self.smtp.sendmail.call_args_list[1][0][3],
expected_mail_options)
def testSendMessageWithMailOptions(self):
mail_options = ['STARTTLS']
expected_mail_options = ('STARTTLS', 'SMTPUTF8', 'BODY=8BITMIME')
self.smtp.send_message(self.msg, None, None, mail_options)
self.assertEqual(mail_options, ['STARTTLS'])
self.assertEqual(self.smtp.sendmail.call_args_list[0][0][3],
expected_mail_options)
# test response of client to a non-successful HELO message
@unittest.skipUnless(threading, 'Threading required for this test.')
class BadHELOServerTests(unittest.TestCase):
......@@ -604,7 +636,9 @@ class TooLongLineTests(unittest.TestCase):
self.sock.settimeout(15)
self.port = support.bind_port(self.sock)
servargs = (self.evt, self.respdata, self.sock)
threading.Thread(target=server, args=servargs).start()
thread = threading.Thread(target=server, args=servargs)
thread.start()
self.addCleanup(thread.join)
self.evt.wait()
self.evt.clear()
......@@ -733,7 +767,7 @@ class SimSMTPChannel(smtpd.SMTPChannel):
try:
user, hashed_pass = logpass.split()
except ValueError as e:
self.push('535 Splitting response {!r} into user and password'
self.push('535 Splitting response {!r} into user and password '
'failed: {}'.format(logpass, e))
return False
valid_hashed_pass = hmac.HMAC(
......@@ -816,6 +850,7 @@ class SimSMTPServer(smtpd.SMTPServer):
def __init__(self, *args, **kw):
self._extra_features = []
self._addresses = {}
smtpd.SMTPServer.__init__(self, *args, **kw)
def handle_accepted(self, conn, addr):
......@@ -824,7 +859,8 @@ class SimSMTPServer(smtpd.SMTPServer):
decode_data=self._decode_data)
def process_message(self, peer, mailfrom, rcpttos, data):
pass
self._addresses['from'] = mailfrom
self._addresses['tos'] = rcpttos
def add_feature(self, feature):
self._extra_features.append(feature)
......@@ -1064,6 +1100,34 @@ class SMTPSimTests(unittest.TestCase):
self.assertRaises(UnicodeEncodeError, smtp.sendmail, 'Alice', 'Böb', '')
self.assertRaises(UnicodeEncodeError, smtp.mail, 'Älice')
def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
# This test is located here and not in the SMTPUTF8SimTests
# class because it needs a "regular" SMTP server to work
msg = EmailMessage()
msg['From'] = "Páolo <főo@bar.com>"
msg['To'] = 'Dinsdale'
msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3)
self.addCleanup(smtp.close)
with self.assertRaises(smtplib.SMTPNotSupportedError):
smtp.send_message(msg)
def test_name_field_not_included_in_envelop_addresses(self):
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3
)
self.addCleanup(smtp.close)
message = EmailMessage()
message['From'] = email.utils.formataddr(('Michaël', 'michael@example.com'))
message['To'] = email.utils.formataddr(('René', 'rene@example.com'))
self.assertDictEqual(smtp.send_message(message), {})
self.assertEqual(self.serv._addresses['from'], 'michael@example.com')
self.assertEqual(self.serv._addresses['tos'], ['rene@example.com'])
class SimSMTPUTF8Server(SimSMTPServer):
......@@ -1194,17 +1258,6 @@ class SMTPUTF8SimTests(unittest.TestCase):
self.assertIn('SMTPUTF8', self.serv.last_mail_options)
self.assertEqual(self.serv.last_rcpt_options, [])
def test_send_message_error_on_non_ascii_addrs_if_no_smtputf8(self):
msg = EmailMessage()
msg['From'] = "Páolo <főo@bar.com>"
msg['To'] = 'Dinsdale'
msg['Subject'] = 'Nudge nudge, wink, wink \u1F609'
smtp = smtplib.SMTP(
HOST, self.port, local_hostname='localhost', timeout=3)
self.addCleanup(smtp.close)
self.assertRaises(smtplib.SMTPNotSupportedError,
smtp.send_message(msg))
EXPECTED_RESPONSE = encode_base64(b'\0psu\0doesnotexist', eol='')
......@@ -1273,18 +1326,5 @@ class SMTPAUTHInitialResponseSimTests(unittest.TestCase):
self.assertEqual(code, 235)
@support.reap_threads
def test_main(verbose=None):
support.run_unittest(
BadHELOServerTests,
DebuggingServerTests,
GeneralTests,
NonConnectingTests,
SMTPAUTHInitialResponseSimTests,
SMTPSimTests,
TooLongLineTests,
)
if __name__ == '__main__':
test_main()
unittest.main()
This diff is collapsed.
......@@ -48,11 +48,11 @@ def receive(sock, n, timeout=20):
if HAVE_UNIX_SOCKETS and HAVE_FORKING:
class ForkingUnixStreamServer(socketserver.ForkingMixIn,
socketserver.UnixStreamServer):
pass
_block_on_close = True
class ForkingUnixDatagramServer(socketserver.ForkingMixIn,
socketserver.UnixDatagramServer):
pass
_block_on_close = True
@contextlib.contextmanager
......@@ -62,10 +62,14 @@ def simple_subprocess(testcase):
if pid == 0:
# Don't raise an exception; it would be caught by the test harness.
os._exit(72)
yield None
pid2, status = os.waitpid(pid, 0)
testcase.assertEqual(pid2, pid)
testcase.assertEqual(72 << 8, status)
try:
yield None
except:
raise
finally:
pid2, status = os.waitpid(pid, 0)
testcase.assertEqual(pid2, pid)
testcase.assertEqual(72 << 8, status)
@unittest.skipUnless(threading, 'Threading required for this test.')
......@@ -101,6 +105,8 @@ class SocketServerTest(unittest.TestCase):
def make_server(self, addr, svrcls, hdlrbase):
class MyServer(svrcls):
_block_on_close = True
def handle_error(self, request, client_address):
self.close_request(request)
raise
......@@ -144,6 +150,10 @@ class SocketServerTest(unittest.TestCase):
t.join()
server.server_close()
self.assertEqual(-1, server.socket.fileno())
if HAVE_FORKING and isinstance(server, socketserver.ForkingMixIn):
# bpo-31151: Check that ForkingMixIn.server_close() waits until
# all children completed
self.assertFalse(server.active_children)
if verbose: print("done")
def stream_examine(self, proto, addr):
......@@ -292,6 +302,7 @@ class ErrorHandlerTest(unittest.TestCase):
def tearDown(self):
test.support.unlink(test.support.TESTFN)
reap_children()
def test_sync_handled(self):
BaseErrorTestServer(ValueError)
......@@ -329,6 +340,8 @@ class ErrorHandlerTest(unittest.TestCase):
class BaseErrorTestServer(socketserver.TCPServer):
_block_on_close = True
def __init__(self, exception):
self.exception = exception
super().__init__((HOST, 0), BadHandler)
......@@ -371,10 +384,7 @@ class ThreadingErrorTestServer(socketserver.ThreadingMixIn,
if HAVE_FORKING:
class ForkingErrorTestServer(socketserver.ForkingMixIn, BaseErrorTestServer):
def wait_done(self):
[child] = self.active_children
os.waitpid(child, 0)
self.active_children.clear()
_block_on_close = True
class SocketWriterTest(unittest.TestCase):
......
This diff is collapsed.
This diff is collapsed.
......@@ -398,5 +398,4 @@ class ExpectTests(ExpectAndReadTestCase):
if __name__ == '__main__':
import unittest
unittest.main()
......@@ -11,6 +11,7 @@ from test import lock_tests
NUMTASKS = 10
NUMTRIPS = 3
POLL_SLEEP = 0.010 # seconds = 10 ms
_print_mutex = thread.allocate_lock()
......@@ -20,6 +21,7 @@ def verbose_print(arg):
with _print_mutex:
print(arg)
class BasicThreadTest(unittest.TestCase):
def setUp(self):
......@@ -31,6 +33,9 @@ class BasicThreadTest(unittest.TestCase):
self.running = 0
self.next_ident = 0
key = support.threading_setup()
self.addCleanup(support.threading_cleanup, *key)
class ThreadRunningTests(BasicThreadTest):
......@@ -54,12 +59,13 @@ class ThreadRunningTests(BasicThreadTest):
self.done_mutex.release()
def test_starting_threads(self):
# Basic test for thread creation.
for i in range(NUMTASKS):
self.newtask()
verbose_print("waiting for tasks to complete...")
self.done_mutex.acquire()
verbose_print("all tasks done")
with support.wait_threads_exit():
# Basic test for thread creation.
for i in range(NUMTASKS):
self.newtask()
verbose_print("waiting for tasks to complete...")
self.done_mutex.acquire()
verbose_print("all tasks done")
def test_stack_size(self):
# Various stack size tests.
......@@ -89,12 +95,13 @@ class ThreadRunningTests(BasicThreadTest):
verbose_print("trying stack_size = (%d)" % tss)
self.next_ident = 0
self.created = 0
for i in range(NUMTASKS):
self.newtask()
with support.wait_threads_exit():
for i in range(NUMTASKS):
self.newtask()
verbose_print("waiting for all tasks to complete")
self.done_mutex.acquire()
verbose_print("all tasks done")
verbose_print("waiting for all tasks to complete")
self.done_mutex.acquire()
verbose_print("all tasks done")
thread.stack_size(0)
......@@ -104,26 +111,29 @@ class ThreadRunningTests(BasicThreadTest):
mut = thread.allocate_lock()
mut.acquire()
started = []
def task():
started.append(None)
mut.acquire()
mut.release()
thread.start_new_thread(task, ())
while not started:
time.sleep(0.01)
self.assertEqual(thread._count(), orig + 1)
# Allow the task to finish.
mut.release()
# The only reliable way to be sure that the thread ended from the
# interpreter's point of view is to wait for the function object to be
# destroyed.
done = []
wr = weakref.ref(task, lambda _: done.append(None))
del task
while not done:
time.sleep(0.01)
support.gc_collect()
self.assertEqual(thread._count(), orig)
with support.wait_threads_exit():
thread.start_new_thread(task, ())
while not started:
time.sleep(POLL_SLEEP)
self.assertEqual(thread._count(), orig + 1)
# Allow the task to finish.
mut.release()
# The only reliable way to be sure that the thread ended from the
# interpreter's point of view is to wait for the function object to be
# destroyed.
done = []
wr = weakref.ref(task, lambda _: done.append(None))
del task
while not done:
time.sleep(POLL_SLEEP)
support.gc_collect()
self.assertEqual(thread._count(), orig)
def test_save_exception_state_on_error(self):
# See issue #14474
......@@ -136,16 +146,14 @@ class ThreadRunningTests(BasicThreadTest):
except ValueError:
pass
real_write(self, *args)
c = thread._count()
started = thread.allocate_lock()
with support.captured_output("stderr") as stderr:
real_write = stderr.write
stderr.write = mywrite
started.acquire()
thread.start_new_thread(task, ())
started.acquire()
while thread._count() > c:
time.sleep(0.01)
with support.wait_threads_exit():
thread.start_new_thread(task, ())
started.acquire()
self.assertIn("Traceback", stderr.getvalue())
......@@ -177,13 +185,14 @@ class Barrier:
class BarrierTest(BasicThreadTest):
def test_barrier(self):
self.bar = Barrier(NUMTASKS)
self.running = NUMTASKS
for i in range(NUMTASKS):
thread.start_new_thread(self.task2, (i,))
verbose_print("waiting for tasks to end")
self.done_mutex.acquire()
verbose_print("tasks done")
with support.wait_threads_exit():
self.bar = Barrier(NUMTASKS)
self.running = NUMTASKS
for i in range(NUMTASKS):
thread.start_new_thread(self.task2, (i,))
verbose_print("waiting for tasks to end")
self.done_mutex.acquire()
verbose_print("tasks done")
def task2(self, ident):
for i in range(NUMTRIPS):
......@@ -225,28 +234,33 @@ class TestForkInThread(unittest.TestCase):
def setUp(self):
self.read_fd, self.write_fd = os.pipe()
@unittest.skipIf(sys.platform.startswith('win'),
"This test is only appropriate for POSIX-like systems.")
@unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
@support.reap_threads
def test_forkinthread(self):
status = "not set"
def thread1():
try:
pid = os.fork() # fork in a thread
except RuntimeError:
os._exit(1) # exit the child
nonlocal status
if pid == 0: # child
# fork in a thread
pid = os.fork()
if pid == 0:
# child
try:
os.close(self.read_fd)
os.write(self.write_fd, b"OK")
finally:
os._exit(0)
else: # parent
else:
# parent
os.close(self.write_fd)
pid, status = os.waitpid(pid, 0)
thread.start_new_thread(thread1, ())
self.assertEqual(os.read(self.read_fd, 2), b"OK",
"Unable to fork() in thread")
with support.wait_threads_exit():
thread.start_new_thread(thread1, ())
self.assertEqual(os.read(self.read_fd, 2), b"OK",
"Unable to fork() in thread")
self.assertEqual(status, 0)
def tearDown(self):
try:
......
......@@ -125,9 +125,10 @@ class ThreadTests(BaseTestCase):
done.set()
done = threading.Event()
ident = []
_thread.start_new_thread(f, ())
done.wait()
self.assertIsNotNone(ident[0])
with support.wait_threads_exit():
tid = _thread.start_new_thread(f, ())
done.wait()
self.assertEqual(ident[0], tid)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
......@@ -165,9 +166,10 @@ class ThreadTests(BaseTestCase):
mutex = threading.Lock()
mutex.acquire()
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
with support.wait_threads_exit():
tid = _thread.start_new_thread(f, (mutex,))
# Wait for the thread to finish.
mutex.acquire()
self.assertIn(tid, threading._active)
self.assertIsInstance(threading._active[tid], threading._DummyThread)
#Issue 29376
......@@ -483,13 +485,15 @@ class ThreadTests(BaseTestCase):
for i in range(20):
t = threading.Thread(target=lambda: None)
t.start()
self.addCleanup(t.join)
pid = os.fork()
if pid == 0:
os._exit(1 if t.is_alive() else 0)
os._exit(11 if t.is_alive() else 10)
else:
t.join()
pid, status = os.waitpid(pid, 0)
self.assertEqual(0, status)
self.assertTrue(os.WIFEXITED(status))
self.assertEqual(10, os.WEXITSTATUS(status))
def test_main_thread(self):
main = threading.main_thread()
......@@ -553,6 +557,37 @@ class ThreadTests(BaseTestCase):
self.assertEqual(err, b"")
self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
@test.support.cpython_only
@requires_type_collecting
def test_main_thread_during_shutdown(self):
# bpo-31516: current_thread() should still point to the main thread
# at shutdown
code = """if 1:
import gc, threading
main_thread = threading.current_thread()
assert main_thread is threading.main_thread() # sanity check
class RefCycle:
def __init__(self):
self.cycle = self
def __del__(self):
print("GC:",
threading.current_thread() is main_thread,
threading.main_thread() is main_thread,
threading.enumerate() == [main_thread])
RefCycle()
gc.collect() # sanity check
x = RefCycle()
"""
_, out, err = assert_python_ok("-c", code)
data = out.decode()
self.assertEqual(err, b"")
self.assertEqual(data.splitlines(),
["GC: True True True"] * 2)
def test_tstate_lock(self):
# Test an implementation detail of Thread objects.
started = _thread.allocate_lock()
......@@ -586,6 +621,7 @@ class ThreadTests(BaseTestCase):
self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock.
self.assertIsNone(t._tstate_lock)
t.join()
def test_repr_stopped(self):
# Verify that "stopped" shows up in repr(Thread) appropriately.
......@@ -612,6 +648,7 @@ class ThreadTests(BaseTestCase):
break
time.sleep(0.01)
self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
t.join()
def test_BoundedSemaphore_limit(self):
# BoundedSemaphore should raise ValueError if released too often.
......@@ -928,6 +965,7 @@ class ThreadingExceptionTests(BaseTestCase):
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError, thread.start)
thread.join()
def test_joining_current_thread(self):
current_thread = threading.current_thread()
......@@ -941,6 +979,7 @@ class ThreadingExceptionTests(BaseTestCase):
thread = threading.Thread()
thread.start()
self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
thread.join()
def test_releasing_unacquired_lock(self):
lock = threading.Lock()
......@@ -1079,6 +1118,8 @@ class ThreadingExceptionTests(BaseTestCase):
thread.join()
self.assertIsNotNone(thread.exc)
self.assertIsInstance(thread.exc, RuntimeError)
# explicitly break the reference cycle to not leak a dangling thread
thread.exc = None
class TimerTests(BaseTestCase):
......@@ -1101,6 +1142,8 @@ class TimerTests(BaseTestCase):
self.callback_event.wait()
self.assertEqual(len(self.callback_args), 2)
self.assertEqual(self.callback_args, [((), {}), ((), {})])
timer1.join()
timer2.join()
def _callback_spy(self, *args, **kwargs):
self.callback_args.append((args[:], kwargs.copy()))
......@@ -1127,10 +1170,6 @@ class CRLockTests(lock_tests.RLockTests):
class EventTests(lock_tests.EventTests):
eventtype = staticmethod(threading.Event)
@unittest.skip("not on gevent")
def test_reset_internal_locks(self):
pass
class ConditionAsRLockTests(lock_tests.RLockTests):
# Condition uses an RLock by default and exports its API.
locktype = staticmethod(threading.Condition)
......
......@@ -16,6 +16,7 @@ except ImportError:
ssl = None
import sys
import tempfile
import warnings
from nturl2path import url2pathname, pathname2url
from base64 import b64encode
......@@ -206,6 +207,7 @@ class urlopen_FileTests(unittest.TestCase):
def test_relativelocalfile(self):
self.assertRaises(ValueError,urllib.request.urlopen,'./' + self.pathname)
class ProxyTests(unittest.TestCase):
def setUp(self):
......@@ -259,6 +261,7 @@ class ProxyTests(unittest.TestCase):
self.assertFalse(bypass('newdomain.com')) # no port
self.assertFalse(bypass('newdomain.com:1235')) # wrong port
class ProxyTests_withOrderedEnv(unittest.TestCase):
def setUp(self):
......@@ -294,6 +297,7 @@ class ProxyTests_withOrderedEnv(unittest.TestCase):
proxies = urllib.request.getproxies_environment()
self.assertEqual('http://somewhere:3128', proxies['http'])
class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
"""Test urlopen() opening a fake http connection."""
......@@ -326,6 +330,59 @@ class urlopen_HttpTests(unittest.TestCase, FakeHTTPMixin, FakeFTPMixin):
finally:
self.unfakehttp()
@unittest.skipUnless(ssl, "ssl module required")
def test_url_with_control_char_rejected(self):
for char_no in list(range(0, 0x21)) + [0x7f]:
char = chr(char_no)
schemeless_url = f"//localhost:7777/test{char}/"
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.")
try:
# We explicitly test urllib.request.urlopen() instead of the top
# level 'def urlopen()' function defined in this... (quite ugly)
# test suite. They use different url opening codepaths. Plain
# urlopen uses FancyURLOpener which goes via a codepath that
# calls urllib.parse.quote() on the URL which makes all of the
# above attempts at injection within the url _path_ safe.
escaped_char_repr = repr(char).replace('\\', r'\\')
InvalidURL = http.client.InvalidURL
with self.assertRaisesRegex(
InvalidURL, f"contain control.*{escaped_char_repr}"):
urllib.request.urlopen(f"http:{schemeless_url}")
with self.assertRaisesRegex(
InvalidURL, f"contain control.*{escaped_char_repr}"):
urllib.request.urlopen(f"https:{schemeless_url}")
# This code path quotes the URL so there is no injection.
resp = urlopen(f"http:{schemeless_url}")
self.assertNotIn(char, resp.geturl())
finally:
self.unfakehttp()
@unittest.skipUnless(ssl, "ssl module required")
def test_url_with_newline_header_injection_rejected(self):
self.fakehttp(b"HTTP/1.1 200 OK\r\n\r\nHello.")
host = "localhost:7777?a=1 HTTP/1.1\r\nX-injected: header\r\nTEST: 123"
schemeless_url = "//" + host + ":8080/test/?test=a"
try:
# We explicitly test urllib.request.urlopen() instead of the top
# level 'def urlopen()' function defined in this... (quite ugly)
# test suite. They use different url opening codepaths. Plain
# urlopen uses FancyURLOpener which goes via a codepath that
# calls urllib.parse.quote() on the URL which makes all of the
# above attempts at injection within the url _path_ safe.
InvalidURL = http.client.InvalidURL
with self.assertRaisesRegex(
InvalidURL, r"contain control.*\\r.*(found at least . .)"):
urllib.request.urlopen(f"http:{schemeless_url}")
with self.assertRaisesRegex(InvalidURL, r"contain control.*\\n"):
urllib.request.urlopen(f"https:{schemeless_url}")
# This code path quotes the URL so there is no injection.
resp = urlopen(f"http:{schemeless_url}")
self.assertNotIn(' ', resp.geturl())
self.assertNotIn('\r', resp.geturl())
self.assertNotIn('\n', resp.geturl())
finally:
self.unfakehttp()
def test_read_0_9(self):
# "0.9" response accepted (but not "simple responses" without
# a status line)
......@@ -432,7 +489,6 @@ Connection: close
finally:
self.unfakeftp()
def test_userpass_inurl(self):
self.fakehttp(b"HTTP/1.0 200 OK\r\n\r\nHello!")
try:
......@@ -476,6 +532,7 @@ Connection: close
"https://localhost", cafile="/nonexistent/path", context=context
)
class urlopen_DataTests(unittest.TestCase):
"""Test urlopen() opening a data URL."""
......@@ -549,6 +606,7 @@ class urlopen_DataTests(unittest.TestCase):
# missing padding character
self.assertRaises(ValueError,urllib.request.urlopen,'data:;base64,Cg=')
class urlretrieve_FileTests(unittest.TestCase):
"""Test urllib.urlretrieve() on local files"""
......@@ -1406,6 +1464,23 @@ class URLopener_Tests(unittest.TestCase):
"spam://c:|windows%/:=&?~#+!$,;'@()*[]|/path/"),
"//c:|windows%/:=&?~#+!$,;'@()*[]|/path/")
def test_local_file_open(self):
# bpo-35907, CVE-2019-9948: urllib must reject local_file:// scheme
class DummyURLopener(urllib.request.URLopener):
def open_local_file(self, url):
return url
with warnings.catch_warnings(record=True):
warnings.simplefilter("ignore", DeprecationWarning)
for url in ('local_file://example', 'local-file://example'):
self.assertRaises(OSError, urllib.request.urlopen, url)
self.assertRaises(OSError, urllib.request.URLopener().open, url)
self.assertRaises(OSError, urllib.request.URLopener().retrieve, url)
self.assertRaises(OSError, DummyURLopener().open, url)
self.assertRaises(OSError, DummyURLopener().retrieve, url)
# Just commented them out.
# Can't really tell why keep failing in windows and sparc.
# Everywhere else they work ok, but on those machines, sometimes
......
......@@ -141,44 +141,55 @@ class RequestHdrsTests(unittest.TestCase):
mgr = urllib.request.HTTPPasswordMgr()
add = mgr.add_password
find_user_pass = mgr.find_user_password
add("Some Realm", "http://example.com/", "joe", "password")
add("Some Realm", "http://example.com/ni", "ni", "ni")
add("c", "http://example.com/foo", "foo", "ni")
add("c", "http://example.com/bar", "bar", "nini")
add("b", "http://example.com/", "first", "blah")
add("b", "http://example.com/", "second", "spam")
add("a", "http://example.com", "1", "a")
add("Some Realm", "http://c.example.com:3128", "3", "c")
add("Some Realm", "d.example.com", "4", "d")
add("Some Realm", "e.example.com:3128", "5", "e")
# For the same realm, password set the highest path is the winner.
self.assertEqual(find_user_pass("Some Realm", "example.com"),
('joe', 'password'))
#self.assertEqual(find_user_pass("Some Realm", "http://example.com/ni"),
# ('ni', 'ni'))
self.assertEqual(find_user_pass("Some Realm", "http://example.com/ni"),
('joe', 'password'))
self.assertEqual(find_user_pass("Some Realm", "http://example.com"),
('joe', 'password'))
self.assertEqual(find_user_pass("Some Realm", "http://example.com/"),
('joe', 'password'))
self.assertEqual(
find_user_pass("Some Realm", "http://example.com/spam"),
('joe', 'password'))
self.assertEqual(
find_user_pass("Some Realm", "http://example.com/spam/spam"),
('joe', 'password'))
self.assertEqual(find_user_pass("Some Realm",
"http://example.com/spam"),
('joe', 'password'))
self.assertEqual(find_user_pass("Some Realm",
"http://example.com/spam/spam"),
('joe', 'password'))
# You can have different passwords for different paths.
add("c", "http://example.com/foo", "foo", "ni")
add("c", "http://example.com/bar", "bar", "nini")
self.assertEqual(find_user_pass("c", "http://example.com/foo"),
('foo', 'ni'))
self.assertEqual(find_user_pass("c", "http://example.com/bar"),
('bar', 'nini'))
# For the same path, newer password should be considered.
add("b", "http://example.com/", "first", "blah")
add("b", "http://example.com/", "second", "spam")
self.assertEqual(find_user_pass("b", "http://example.com/"),
('second', 'spam'))
# No special relationship between a.example.com and example.com:
add("a", "http://example.com", "1", "a")
self.assertEqual(find_user_pass("a", "http://example.com/"),
('1', 'a'))
self.assertEqual(find_user_pass("a", "http://a.example.com/"),
(None, None))
......@@ -830,7 +841,6 @@ class HandlerTests(unittest.TestCase):
for url, ftp in [
("file://ftp.example.com//foo.txt", False),
("file://ftp.example.com///foo.txt", False),
# XXXX bug: fails with OSError, should be URLError
("file://ftp.example.com/foo.txt", False),
("file://somehost//foo/something.txt", False),
("file://localhost//foo/something.txt", False),
......@@ -838,8 +848,7 @@ class HandlerTests(unittest.TestCase):
req = Request(url)
try:
h.file_open(req)
# XXXX remove OSError when bug fixed
except (urllib.error.URLError, OSError):
except urllib.error.URLError:
self.assertFalse(ftp)
else:
self.assertIs(o.req, req)
......@@ -1414,7 +1423,6 @@ class HandlerTests(unittest.TestCase):
self.assertEqual(req.host, "proxy.example.com:3128")
self.assertEqual(req.get_header("Proxy-authorization"), "FooBar")
# TODO: This should be only for OSX
@unittest.skipUnless(sys.platform == 'darwin', "only relevant for OSX")
def test_osx_proxy_bypass(self):
bypass = {
......@@ -1690,7 +1698,6 @@ class HandlerTests(unittest.TestCase):
self.assertTrue(conn.fakesock.closed, "Connection not closed")
class MiscTests(unittest.TestCase):
def opener_has_handler(self, opener, handler_class):
......
......@@ -289,11 +289,15 @@ class BasicAuthTests(unittest.TestCase):
def http_server_with_basic_auth_handler(*args, **kwargs):
return BasicAuthHandler(*args, **kwargs)
self.server = LoopbackHttpServerThread(http_server_with_basic_auth_handler)
self.addCleanup(self.server.stop)
self.addCleanup(self.stop_server)
self.server_url = 'http://127.0.0.1:%s' % self.server.port
self.server.start()
self.server.ready.wait()
def stop_server(self):
self.server.stop()
self.server = None
def tearDown(self):
super(BasicAuthTests, self).tearDown()
......@@ -304,7 +308,7 @@ class BasicAuthTests(unittest.TestCase):
try:
self.assertTrue(urllib.request.urlopen(self.server_url))
except urllib.error.HTTPError:
self.fail("Basic auth failed for the url: %s", self.server_url)
self.fail("Basic auth failed for the url: %s" % self.server_url)
def test_basic_auth_httperror(self):
ah = urllib.request.HTTPBasicAuthHandler()
......@@ -339,6 +343,7 @@ class ProxyAuthTests(unittest.TestCase):
return FakeProxyHandler(self.digest_auth_handler, *args, **kwargs)
self.server = LoopbackHttpServerThread(create_fake_proxy_handler)
self.addCleanup(self.stop_server)
self.server.start()
self.server.ready.wait()
proxy_url = "http://127.0.0.1:%d" % self.server.port
......@@ -347,9 +352,9 @@ class ProxyAuthTests(unittest.TestCase):
self.opener = urllib.request.build_opener(
handler, self.proxy_digest_handler)
def tearDown(self):
def stop_server(self):
self.server.stop()
super(ProxyAuthTests, self).tearDown()
self.server = None
def test_proxy_with_bad_password_raises_httperror(self):
self.proxy_digest_handler.add_password(self.REALM, self.URL,
......@@ -468,13 +473,17 @@ class TestUrlopen(unittest.TestCase):
f.close()
return b"".join(l)
def stop_server(self):
self.server.stop()
self.server = None
def start_server(self, responses=None):
if responses is None:
responses = [(200, [], b"we don't care")]
handler = GetRequestHandler(responses)
self.server = LoopbackHttpServerThread(handler)
self.addCleanup(self.server.stop)
self.addCleanup(self.stop_server)
self.server.start()
self.server.ready.wait()
port = self.server.port
......@@ -589,7 +598,7 @@ class TestUrlopen(unittest.TestCase):
def cb_sni(ssl_sock, server_name, initial_context):
nonlocal sni_name
sni_name = server_name
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.set_servername_callback(cb_sni)
handler = self.start_https_server(context=context, certfile=CERT_localhost)
context = ssl.create_default_context(cafile=CERT_localhost)
......@@ -664,7 +673,7 @@ def setUpModule():
def tearDownModule():
if threads_key:
support.threading_cleanup(threads_key)
support.threading_cleanup(*threads_key)
if __name__ == "__main__":
unittest.main()
......@@ -27,6 +27,13 @@ def _wrap_with_retry_thrice(func, exc):
return _retry_thrice(func, exc, *args, **kwargs)
return wrapped
# bpo-35411: FTP tests of test_urllib2net randomly fail
# with "425 Security: Bad IP connecting" on Travis CI
skip_ftp_test_on_travis = unittest.skipIf('TRAVIS' in os.environ,
'bpo-35411: skip FTP test '
'on Travis CI')
# Connecting to remote hosts is flaky. Make it more robust by retrying
# the connection several times.
_urlopen_with_retry = _wrap_with_retry_thrice(urllib.request.urlopen,
......@@ -95,10 +102,11 @@ class OtherNetworkTests(unittest.TestCase):
# XXX The rest of these tests aren't very good -- they don't check much.
# They do sometimes catch some major disasters, though.
@skip_ftp_test_on_travis
def test_ftp(self):
urls = [
'ftp://ftp.debian.org/debian/README',
('ftp://ftp.debian.org/debian/non-existent-file',
'ftp://www.pythontest.net/README',
('ftp://www.pythontest.net/non-existent-file',
None, urllib.error.URLError),
]
self._test_urls(urls, self._extra_handlers())
......@@ -177,6 +185,7 @@ class OtherNetworkTests(unittest.TestCase):
opener.open(request)
self.assertEqual(request.get_header('User-agent'),'Test-Agent')
@unittest.skip('XXX: http://www.imdb.com is gone')
def test_sites_no_connection_close(self):
# Some sites do not send Connection: close header.
# Verify that those work properly. (#issue12576)
......@@ -287,8 +296,9 @@ class TimeoutTest(unittest.TestCase):
self.addCleanup(u.close)
self.assertEqual(u.fp.raw._sock.gettimeout(), 120)
FTP_HOST = 'ftp://ftp.debian.org/debian/'
FTP_HOST = 'ftp://www.pythontest.net/'
@skip_ftp_test_on_travis
def test_ftp_basic(self):
self.assertIsNone(socket.getdefaulttimeout())
with support.transient_internet(self.FTP_HOST, timeout=None):
......@@ -296,6 +306,7 @@ class TimeoutTest(unittest.TestCase):
self.addCleanup(u.close)
self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
@skip_ftp_test_on_travis
def test_ftp_default_timeout(self):
self.assertIsNone(socket.getdefaulttimeout())
with support.transient_internet(self.FTP_HOST):
......@@ -307,6 +318,7 @@ class TimeoutTest(unittest.TestCase):
socket.setdefaulttimeout(None)
self.assertEqual(u.fp.fp.raw._sock.gettimeout(), 60)
@skip_ftp_test_on_travis
def test_ftp_no_timeout(self):
self.assertIsNone(socket.getdefaulttimeout())
with support.transient_internet(self.FTP_HOST):
......@@ -318,6 +330,7 @@ class TimeoutTest(unittest.TestCase):
socket.setdefaulttimeout(None)
self.assertIsNone(u.fp.fp.raw._sock.gettimeout())
@skip_ftp_test_on_travis
def test_ftp_timeout(self):
with support.transient_internet(self.FTP_HOST):
u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment