Commit f70d357b authored by Jeremy Hylton's avatar Jeremy Hylton

Add TestThread class that helps manage threads started by a unittest.

Use TestThread as base class for WorkerThread in CommitLockTests.
parent 46d79eae
......@@ -13,13 +13,12 @@
##############################################################################
"""Tests of the distributed commit lock."""
import threading
from ZODB.Transaction import Transaction
from ZODB.tests.StorageTestBase import zodb_pickle, MinPO
import ZEO.ClientStorage
from ZEO.Exceptions import Disconnected
from ZEO.tests.TestThread import TestThread
ZERO = '\0'*8
......@@ -27,19 +26,18 @@ class DummyDB:
def invalidate(self, *args):
pass
class WorkerThread(threading.Thread):
class WorkerThread(TestThread):
# run the entire test in a thread so that the blocking call for
# tpc_vote() doesn't hang the test suite.
def __init__(self, storage, trans, method="tpc_finish"):
def __init__(self, testcase, storage, trans, method="tpc_finish"):
self.storage = storage
self.trans = trans
self.method = method
threading.Thread.__init__(self)
self.setDaemon(1)
TestThread.__init__(self, testcase)
def run(self):
def testrun(self):
try:
self.storage.tpc_begin(self.trans)
oid = self.storage.new_oid()
......@@ -151,15 +149,13 @@ class CommitLockTests:
def _dosetup2(self, storage, trans, tid):
self._threads = []
t = WorkerThread(storage, trans)
t = WorkerThread(self, storage, trans)
self._threads.append(t)
t.start()
def _dowork2(self, method_name):
for t in self._threads:
t.join(10)
for t in self._threads:
self.failIf(t.isAlive())
t.cleanup()
def _duplicate_client(self):
"Open another ClientStorage to the same server."
......
##############################################################################
#
# Copyright (c) 2002 Zope Corporation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.0 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE
#
##############################################################################
"""A Thread base class for use with unittest."""
from cStringIO import StringIO
import threading
import traceback
class TestThread(threading.Thread):
__super_init = threading.Thread.__init__
__super_run = threading.Thread.run
def __init__(self, testcase, group=None, target=None, name=None,
args=(), kwargs={}, verbose=None):
self.__super_init(group, target, name, args, kwargs, verbose)
self.setDaemon(1)
self._testcase = testcase
def run(self):
try:
self.testrun()
except Exception, err:
s = StringIO()
traceback.print_exc(file=s)
self._testcase.fail("Exception in thread %s:\n%s\n" %
(self, s.getvalue()))
def cleanup(self, timeout=15):
self.join(timeout)
if self.isAlive():
self._testcase.fail("Thread did not finish: %s" % self)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment