Commit ff6d1cd1 authored by Denis Bilenko's avatar Denis Bilenko

test_threading_2.py: make sure we use gevent in spawned subprocesses as well;...

test_threading_2.py: make sure we use gevent in spawned subprocesses as well; this fixes a number of failures
parent 4bfb58e6
# testing gevent's Event, Lock, RLock, Semaphore, BoundedSemaphore with standard test_threading
from __future__ import with_statement
from gevent import monkey; monkey.patch_all()
setup = '''from gevent import monkey; monkey.patch_all()
from gevent.event import Event
from gevent.coros import RLock, Semaphore, BoundedSemaphore
from gevent.thread import allocate_lock as Lock
import test.test_support
from test.test_support import verbose
import random
import re
import sys
import threading
import thread
import time
import unittest
import weakref
threading.Event = Event
threading.Lock = Lock
threading.RLock = RLock
......@@ -27,8 +17,29 @@ if not hasattr(threading.Thread, 'name'):
threading.Thread.name = property(lambda self: self.getName())
if not hasattr(threading.Thread, 'is_alive'):
threading.Thread.is_alive = threading.Thread.isAlive
if not hasattr(threading.Thread, 'daemon'):
threading.Thread.daemon = property(threading.Thread.isDaemon, threading.Thread.setDaemon)
if not hasattr(threading._Condition, 'notify_all'):
threading._Condition.notify_all = threading._Condition.notifyAll
'''
exec setup
setup_3 = '\n'.join(' %s' % line for line in setup.split('\n'))
setup_4 = '\n'.join(' %s' % line for line in setup.split('\n'))
setup_5 = '\n'.join(' %s' % line for line in setup.split('\n'))
import test.test_support
from test.test_support import verbose
import random
import re
import sys
import threading
import thread
import time
import unittest
import weakref
import lock_tests
......@@ -271,6 +282,7 @@ class ThreadTests(unittest.TestCase):
import subprocess
rc = subprocess.call([sys.executable, "-c", """if 1:
%s
import ctypes, sys, time, thread
# This lock is used as a simple event variable.
......@@ -294,7 +306,7 @@ class ThreadTests(unittest.TestCase):
thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
"""])
""" % setup_4])
self.assertEqual(rc, 42)
def test_finalize_with_trace(self):
......@@ -302,6 +314,7 @@ class ThreadTests(unittest.TestCase):
# Avoid a deadlock when sys.settrace steps into threading._shutdown
import subprocess
rc = subprocess.call([sys.executable, "-c", """if 1:
%s
import sys, threading
# A deadlock-killer, to prevent the
......@@ -321,7 +334,7 @@ class ThreadTests(unittest.TestCase):
return func
sys.settrace(func)
"""])
""" % setup_3])
self.failIf(rc == 2, "interpreted was blocked")
self.failUnless(rc == 0, "Unexpected error")
......@@ -331,6 +344,7 @@ class ThreadTests(unittest.TestCase):
# Raising SystemExit skipped threading._shutdown
import subprocess
p = subprocess.Popen([sys.executable, "-c", """if 1:
%s
import threading
from time import sleep
......@@ -342,12 +356,12 @@ class ThreadTests(unittest.TestCase):
threading.Thread(target=child).start()
raise SystemExit
"""],
""" % setup_5],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
self.assertEqual(stdout.strip(),
"Woke up, sleep function is: <built-in function sleep>")
stdout = stdout.strip()
assert re.match('^Woke up, sleep function is: <.*?sleep.*?>$', stdout), repr(stdout)
stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
self.assertEqual(stderr, "")
......@@ -407,13 +421,14 @@ class ThreadJoinOnShutdown(unittest.TestCase):
def _run_and_join(self, script):
script = """if 1:
%s
import sys, os, time, threading
# a thread, which waits for the main program to terminate
def joiningfunc(mainthread):
mainthread.join()
print 'end of thread'
\n""" + script
\n""" % setup_3 + script
import subprocess
p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment