Commit 11cf7d7c authored by Ralf Schmitt's avatar Ralf Schmitt

pep8 cleanup test_threading_2.py

parent 63e7f1f9
......@@ -44,16 +44,22 @@ import weakref
import lock_tests
# A trivial mutable counter.
class Counter(object):
def __init__(self):
self.value = 0
def inc(self):
self.value += 1
def dec(self):
self.value -= 1
def get(self):
return self.value
class TestThread(threading.Thread):
def __init__(self, name, testcase, sema, mutex, nrunning):
threading.Thread.__init__(self, name=name)
......@@ -86,6 +92,7 @@ class TestThread(threading.Thread):
print '%s is finished. %d tasks are running' % (
self.name, self.nrunning.get())
class ThreadTests(unittest.TestCase):
# Create a bunch of threads, let each do some work, wait until all are
......@@ -103,7 +110,7 @@ class ThreadTests(unittest.TestCase):
threads = []
for i in range(NUMTASKS):
t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
t = TestThread("<thread %d>" % i, self, sema, mutex, numrunning)
threads.append(t)
if hasattr(t, 'ident'):
self.failUnlessEqual(t.ident, None)
......@@ -127,6 +134,7 @@ class ThreadTests(unittest.TestCase):
def test_ident_of_no_threading_threads(self):
# The ident still must work for the main thread and dummy threads.
self.assertFalse(threading.currentThread().ident is None)
def f():
ident.append(threading.currentThread().ident)
done.set()
......@@ -220,7 +228,7 @@ class ThreadTests(unittest.TestCase):
worker_saw_exception.set()
t = Worker()
t.daemon = True # so if this fails, we don't hang Python at shutdown
t.daemon = True # so if this fails, we don't hang Python at shutdown
t.start()
if verbose:
print " started worker thread"
......@@ -241,7 +249,7 @@ class ThreadTests(unittest.TestCase):
if verbose:
print " attempting to raise asynch exception in worker"
result = set_async_exc(ctypes.c_long(t.id), exception)
self.assertEqual(result, 1) # one thread state modified
self.assertEqual(result, 1) # one thread state modified
if verbose:
print " waiting for worker to say it caught the exception"
worker_saw_exception.wait(timeout=10)
......@@ -393,7 +401,7 @@ class ThreadTests(unittest.TestCase):
self.should_raise = should_raise
self.thread = threading.Thread(target=self._run,
args=(self,),
kwargs={'yet_another':self})
kwargs={'yet_another': self})
self.thread.start()
def _run(self, other_ref, yet_another):
......@@ -450,7 +458,6 @@ class ThreadJoinOnShutdown(unittest.TestCase):
"""
self._run_and_join(script)
def test_2_join_in_forked_process(self):
# Like the test above, but from a forked interpreter
import os
......@@ -512,7 +519,7 @@ class ThreadingExceptionTests(unittest.TestCase):
def test_joining_current_thread(self):
current_thread = threading.current_thread()
self.assertRaises(RuntimeError, current_thread.join);
self.assertRaises(RuntimeError, current_thread.join)
def test_joining_inactive_thread(self):
thread = threading.Thread()
......@@ -527,22 +534,28 @@ class ThreadingExceptionTests(unittest.TestCase):
class LockTests(lock_tests.LockTests):
locktype = staticmethod(threading.Lock)
class RLockTests(lock_tests.RLockTests):
locktype = staticmethod(threading.RLock)
class EventTests(lock_tests.EventTests):
eventtype = staticmethod(threading.Event)
class ConditionAsRLockTests(lock_tests.RLockTests):
# An Condition uses an RLock by default and exports its API.
locktype = staticmethod(threading.Condition)
class ConditionTests(lock_tests.ConditionTests):
condtype = staticmethod(threading.Condition)
class SemaphoreTests(lock_tests.SemaphoreTests):
semtype = staticmethod(threading.Semaphore)
class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
semtype = staticmethod(threading.BoundedSemaphore)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment