Commit 689d91db authored by Jason Madden's avatar Jason Madden

Remove special code for python 2.5 since we don't support that anymore.

parent f7e9f654
......@@ -12,23 +12,12 @@ threading.Lock = Lock
threading.RLock = RLock
threading.Semaphore = Semaphore
threading.BoundedSemaphore = BoundedSemaphore
if not hasattr(threading, 'current_thread'):
threading.current_thread = threading.currentThread
if not hasattr(threading.Thread, 'name'):
threading.Thread.name = property(lambda self: self.getName())
if not hasattr(threading.Thread, 'is_alive'):
threading.Thread.is_alive = threading.Thread.isAlive
if not hasattr(threading.Thread, 'daemon'):
threading.Thread.daemon = property(threading.Thread.isDaemon, threading.Thread.setDaemon)
if hasattr(threading, '_Condition') and not hasattr(threading._Condition, 'notify_all'):
threading._Condition.notify_all = threading._Condition.notifyAll
'''
exec(setup_)
setup_3 = '\n'.join(' %s' % line for line in setup_.split('\n'))
setup_4 = '\n'.join(' %s' % line for line in setup_.split('\n'))
setup_5 = '\n'.join(' %s' % line for line in setup_.split('\n'))
try:
......@@ -140,21 +129,29 @@ class ThreadTests(unittest.TestCase):
print('all tasks done')
self.assertEqual(numrunning.get(), 0)
if sys.version_info[:2] > (2, 5):
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads,
# as must the repr and str.
t = threading.currentThread()
self.assertFalse(t.ident is None)
str(t)
repr(t)
def f():
t = threading.currentThread()
ident.append(t.ident)
str(t)
repr(t)
done.set()
done = threading.Event()
ident = []
thread.start_new_thread(f, ())
done.wait()
self.assertFalse(ident[0] is None)
# Kill the "immortal" _DummyThread
del threading._active[ident[0]]
# run with a small(ish) thread stack size (256kB)
def test_various_ops_small_stack(self):
......@@ -271,98 +268,95 @@ class ThreadTests(unittest.TestCase):
t.join()
# else the thread is still running, and we have no way to kill it
if sys.version_info[:2] > (2, 5):
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
if sys.version_info[:2] > (2, 5):
def test_finalize_runnning_thread(self):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
try:
import ctypes
getattr(ctypes, 'pythonapi') # not available on PyPy
except (ImportError,AttributeError):
if verbose:
print("test_finalize_with_runnning_thread can't import ctypes")
return # can't do anything
def test_limbo_cleanup(self):
# Issue 7481: Failure to start thread should cleanup the limbo map.
def fail_new_thread(*args):
raise thread.error()
_start_new_thread = threading._start_new_thread
threading._start_new_thread = fail_new_thread
try:
t = threading.Thread(target=lambda: None)
self.assertRaises(thread.error, t.start)
self.assertFalse(
t in threading._limbo,
"Failed to cleanup _limbo map on failure of Thread.start().")
finally:
threading._start_new_thread = _start_new_thread
del ctypes # pyflakes fix
def test_finalize_runnning_thread(self):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
try:
import ctypes
getattr(ctypes, 'pythonapi') # not available on PyPy
except (ImportError,AttributeError):
if verbose:
print("test_finalize_with_runnning_thread can't import ctypes")
return # can't do anything
import subprocess
rc = subprocess.call([sys.executable, "-c", """if 1:
del ctypes # pyflakes fix
import subprocess
rc = subprocess.call([sys.executable, "-c", """if 1:
%s
import ctypes, sys, time
try:
import thread
except ImportError:
import _thread as thread # Py3
# This lock is used as a simple event variable.
ready = thread.allocate_lock()
ready.acquire()
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
ensure = ctypes.pythonapi.PyGILState_Ensure
release = ctypes.pythonapi.PyGILState_Release
def __del__(self):
state = self.ensure()
self.release(state)
def waitingThread():
x = C()
ready.release()
time.sleep(100)
thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
""" % setup_4])
self.assertEqual(rc, 42)
if sys.version_info[:2] > (2, 5):
def test_join_nondaemon_on_shutdown(self):
# Issue 1722344
# Raising SystemExit skipped threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1:
import ctypes, sys, time
try:
import thread
except ImportError:
import _thread as thread # Py3
# This lock is used as a simple event variable.
ready = thread.allocate_lock()
ready.acquire()
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
ensure = ctypes.pythonapi.PyGILState_Ensure
release = ctypes.pythonapi.PyGILState_Release
def __del__(self):
state = self.ensure()
self.release(state)
def waitingThread():
x = C()
ready.release()
time.sleep(100)
thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
""" % setup_3])
self.assertEqual(rc, 42)
def test_join_nondaemon_on_shutdown(self):
# Issue 1722344
# Raising SystemExit skipped threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1:
%s
import threading
from time import sleep
def child():
sleep(1)
# As a non-daemon thread we SHOULD wake up and nothing
# should be torn down yet
print("Woke up, sleep function is: %%r" %% sleep)
threading.Thread(target=child).start()
raise SystemExit
""" % setup_5],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
stdout = stdout.strip()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
assert re.match('^Woke up, sleep function is: <.*?sleep.*?>$', stdout), repr(stdout)
stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
self.assertEqual(stderr, "")
import threading
from time import sleep
def child():
sleep(1)
# As a non-daemon thread we SHOULD wake up and nothing
# should be torn down yet
print("Woke up, sleep function is: %%r" %% sleep)
threading.Thread(target=child).start()
raise SystemExit
""" % setup_4],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
stdout = stdout.strip()
stdout = stdout.decode('utf-8')
stderr = stderr.decode('utf-8')
assert re.match('^Woke up, sleep function is: <.*?sleep.*?>$', stdout), repr(stdout)
stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
self.assertEqual(stderr, "")
def test_enumerate_after_join(self):
# Try hard to trigger #1703448: a thread is still returned in
......@@ -383,7 +377,7 @@ class ThreadTests(unittest.TestCase):
finally:
sys.setcheckinterval(old_interval)
if sys.version_info[:2] > (2, 5) and not hasattr(sys, 'pypy_version_info'):
if not hasattr(sys, 'pypy_version_info'):
def test_no_refcycle_through_target(self):
class RunSelfFunction(object):
def __init__(self, should_raise):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment