Commit 30c861dc authored by Dieter Maurer's avatar Dieter Maurer Committed by GitHub

Merge pull request #380 from zopefoundation/racetest

`racetest` improvement
parents 989457c7 ca41ccd1
......@@ -40,6 +40,7 @@ between load/open and local invalidations to catch bugs similar to
https://github.com/zopefoundation/ZODB/issues/290 and
https://github.com/zopefoundation/ZEO/issues/166.
"""
from __future__ import print_function
import threading
from random import randint
......@@ -80,7 +81,7 @@ class T2ObjectsInc:
"""T2ObjectsInc is specification with behaviour where two objects obj1
and obj2 are incremented synchronously.
It is used in tests where bugs can be immedeately observed after the race.
It is used in tests where bugs can be immediately observed after the race.
invariant: obj1 == obj2
"""
......@@ -159,10 +160,7 @@ class RaceTests(object):
# Access to half of the objects is organized to always trigger loading
# from zstor. Access to the other half goes through zconn cache and so
# verifies whether the cache is not stale.
failed = threading.Event()
failure = [None]
def verify():
def verify(tg):
transaction.begin()
zconn = db.open()
root = zconn.root()
......@@ -176,8 +174,7 @@ class RaceTests(object):
except AssertionError as e:
msg = "verify: %s\n" % e
msg += _state_details(root)
failure[0] = msg
failed.set()
tg.fail(msg)
# we did not changed anything; also fails with commit:
transaction.abort()
......@@ -186,7 +183,7 @@ class RaceTests(object):
# `modify` changes objects in the database by executing "next" step.
#
# Spec invariant should be preserved.
def modify():
def modify(tg):
transaction.begin()
zconn = db.open()
......@@ -199,32 +196,21 @@ class RaceTests(object):
# `xrun` runs f in a loop until either N iterations, or until failed is
# set.
def xrun(f, N):
try:
for i in range(N):
# print('%s.%d' % (f.__name__, i))
f()
if failed.is_set():
break
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
def xrun(tg, tx, f, N):
for i in range(N):
# print('%s.%d' % (f.__name__, i))
f(tg)
if tg.failed():
break
# loop verify and modify concurrently.
init()
N = 500
tverify = Daemon(
name='Tverify', target=xrun, args=(verify, N))
tmodify = Daemon(
name='Tmodify', target=xrun, args=(modify, N))
tverify.start()
tmodify.start()
tverify.join(60)
tmodify.join(60)
if failed.is_set():
self.fail(failure[0])
tg = TestWorkGroup(self)
tg.go(xrun, verify, N, name='Tverify')
tg.go(xrun, modify, N, name='Tmodify')
tg.wait(120)
# client-server storages like ZEO, NEO and RelStorage allow several storage
# clients to be connected to single storage server.
......@@ -285,10 +271,7 @@ class RaceTests(object):
#
# Once in a while T tries to modify the database executing spec "next"
# as test source of changes for other workers.
failed = threading.Event()
failure = [None] * nwork # [tx] is failure from T(tx)
def T(tx, N):
def T(tg, tx, N):
db = self.dbopen()
def t_():
......@@ -305,8 +288,7 @@ class RaceTests(object):
except AssertionError as e:
msg = "T%s: %s\n" % (tx, e)
msg += _state_details(root)
failure[tx] = msg
failed.set()
tg.fail(msg)
# change objects once in a while
if randint(0, 4) == 0:
......@@ -326,11 +308,8 @@ class RaceTests(object):
for i in range(N):
# print('T%s.%d' % (tx, i))
t_()
if failed.is_set():
if tg.failed():
break
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
finally:
db.close()
......@@ -338,24 +317,17 @@ class RaceTests(object):
init()
N = 100
tg = []
for x in range(nwork):
t = Daemon(name='T%d' % x, target=T, args=(x, N))
t.start()
tg.append(t)
for t in tg:
t.join(60)
if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
tg = TestWorkGroup(self)
for _ in range(nwork):
tg.go(T, N)
tg.wait(120)
# verify storage for race in between client disconnect and external
# invalidations. https://github.com/zopefoundation/ZEO/issues/209
#
# This test is simlar to check_race_load_vs_external_invalidate, but
# This test is similar to check_race_load_vs_external_invalidate, but
# increases the number of workers and also makes every worker to repeatedly
# reconnect to the storage, so that the probability of disconection is
# reconnect to the storage, so that the probability of disconnection is
# high. It also uses T2ObjectsInc2Phase instead of T2ObjectsInc because if
# an invalidation is skipped due to the disconnect/invalidation race,
# T2ObjectsInc won't catch the bug as both objects will be either in old
......@@ -381,10 +353,7 @@ class RaceTests(object):
# `T` is similar to the T from _check_race_load_vs_external_invalidate
# but reconnects to the database often.
failed = threading.Event()
failure = [None] * nwork # [tx] is failure from T(tx)
def T(tx, N):
def T(tg, tx, N):
def t_():
def work1(db):
transaction.begin()
......@@ -400,8 +369,7 @@ class RaceTests(object):
except AssertionError as e:
msg = "T%s: %s\n" % (tx, e)
msg += _state_details(root)
failure[tx] = msg
failed.set()
tg.fail(msg)
zconn.close()
transaction.abort()
......@@ -424,37 +392,26 @@ class RaceTests(object):
db = self.dbopen()
try:
for i in range(4):
if failed.is_set():
if tg.failed():
break
work1(db)
finally:
db.close()
try:
for i in range(N):
# print('T%s.%d' % (tx, i))
if failed.is_set():
break
t_()
except: # noqa: E722 do not use bare 'except'
failed.set()
raise
for i in range(N):
# print('T%s.%d' % (tx, i))
if tg.failed():
break
t_()
# run the workers concurrently.
init()
N = 100 // (2*4) # N reduced to save time
tg = []
for x in range(nwork):
t = Daemon(name='T%d' % x, target=T, args=(x, N))
t.start()
tg.append(t)
for t in tg:
t.join(60)
if failed.is_set():
self.fail('\n\n'.join([_ for _ in failure if _]))
tg = TestWorkGroup(self)
for _ in range(nwork):
tg.go(T, N)
tg.wait(120)
# `_state_init` initializes the database according to the spec.
......@@ -468,7 +425,7 @@ def _state_init(db, spec):
zconn.close()
# `_state_invalidate_half1` invalidatates first 50% of database objects, so
# `_state_invalidate_half1` invalidates first 50% of database objects, so
# that the next time they are accessed, they are reloaded from the storage.
def _state_invalidate_half1(root):
keys = list(sorted(root.keys()))
......@@ -526,13 +483,156 @@ def _state_details(root): # -> txt
return txt
class TestWorkGroup(object):
"""TestWorkGroup represents group of threads that run together to verify
something.
- .go() adds test thread to the group.
- .wait() waits for all spawned threads to finish and reports all
collected failures to containing testcase.
- a test should indicate failure by call to .fail(), it
can check for a failure with .failed()
"""
def __init__(self, testcase):
self.testcase = testcase
self.failed_event = threading.Event()
self.fail_mu = threading.Lock()
self.failv = [] # failures registered by .fail
self.threadv = [] # spawned threads
self.waitg = WaitGroup() # to wait for spawned threads
def fail(self, msg):
"""fail adds failure to test result."""
with self.fail_mu:
self.failv.append(msg)
self.failed_event.set()
def failed(self):
"""did the test already fail."""
return self.failed_event.is_set()
def go(self, f, *argv, **kw):
"""go spawns f(self, #thread, *argv, **kw) in new test thread."""
self.waitg.add(1)
tx = len(self.threadv)
tname = kw.pop('name', 'T%d' % tx)
t = Daemon(name=tname, target=self._run, args=(f, tx, argv, kw))
self.threadv.append(t)
t.start()
def _run(self, f, tx, argv, kw):
tname = self.threadv[tx].name
try:
f(self, tx, *argv, **kw)
except Exception as e:
self.fail("Unhandled exception %r in thread %s"
% (e, tname))
raise
finally:
self.waitg.done()
def wait(self, timeout):
"""wait waits for all test threads to complete and reports all
collected failures to containing testcase."""
if not self.waitg.wait(timeout):
self.fail("test did not finish within %s seconds" % timeout)
failed_to_finish = []
for t in self.threadv:
try:
t.join(1)
except AssertionError:
self.failed_event.set()
failed_to_finish.append(t.name)
if failed_to_finish:
self.fail("threads did not finish: %s" % failed_to_finish)
del self.threadv # avoid cyclic garbage
if self.failed():
self.testcase.fail('\n\n'.join(self.failv))
class Daemon(threading.Thread):
"""auxiliary class to create daemon threads and fail if not stopped."""
"""auxiliary class to create daemon threads and fail if not stopped.
In addition, the class ensures that reports for uncaught exceptions
are output holding a lock. This prevents that concurrent reports
get intermixed and facilitates the exception analysis.
"""
def __init__(self, **kw):
super(Daemon, self).__init__(**kw)
self.daemon = True
if hasattr(self, "_invoke_excepthook"):
# Python 3.8+
ori_invoke_excepthook = self._invoke_excepthook
def invoke_excepthook(*args, **kw):
with exc_lock:
return ori_invoke_excepthook(*args, **kw)
self._invoke_excepthook = invoke_excepthook
else:
# old Python
ori_run = self.run
def run():
from threading import _format_exc
from threading import _sys
try:
ori_run()
except SystemExit:
pass
except BaseException:
if _sys and _sys.stderr is not None:
with exc_lock:
print("Exception in thread %s:\n%s" %
(self.name, _format_exc()),
file=_sys.stderr)
else:
raise
finally:
del self.run
self.run = run
def join(self, *args, **kw):
super(Daemon, self).join(*args, **kw)
if self.is_alive():
raise AssertionError("Thread %s did not stop" % self.name)
# lock to ensure that Daemon exception reports are output atomically
exc_lock = threading.Lock()
class WaitGroup(object):
"""WaitGroup provides service to wait for spawned workers to be done.
- .add() adds workers
- .done() indicates that one worker is done
- .wait() waits until all workers are done
"""
def __init__(self):
self.n = 0
self.condition = threading.Condition()
def add(self, delta):
with self.condition:
self.n += delta
if self.n < 0:
raise AssertionError("#workers is negative")
if self.n == 0:
self.condition.notify_all()
def done(self):
self.add(-1)
def wait(self, timeout): # -> ok
with self.condition:
if self.n == 0:
return True
ok = self.condition.wait(timeout)
if ok is None: # py2
ok = (self.n == 0)
return ok
##############################################################################
#
# Copyright (c) 2019 - 2023 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
from time import sleep
from unittest import TestCase
from .racetest import TestWorkGroup
class TestWorkGroupTests(TestCase):
def setUp(self):
self._failed = failed = []
case_mockup = SimpleNamespace(fail=failed.append)
self.tg = TestWorkGroup(case_mockup)
@property
def failed(self):
return "\n\n".join(self._failed)
def test_success(self):
tg = self.tg
tg.go(tg_test_function)
tg.wait(10)
self.assertEqual(self.failed, "")
def test_failure1(self):
tg = self.tg
tg.go(tg_test_function, T_FAIL)
tg.wait(10)
self.assertEqual(self.failed, "T0 failed")
def test_failure1_okmany(self):
tg = self.tg
tg.go(tg_test_function, T_SUCCESS)
tg.go(tg_test_function, T_SUCCESS)
tg.go(tg_test_function, T_SUCCESS)
tg.go(tg_test_function, T_FAIL)
tg.wait(10)
self.assertEqual(self.failed, "T3 failed")
def test_failure_many(self):
tg = self.tg
tg.go(tg_test_function, T_FAIL)
tg.go(tg_test_function, T_SUCCESS)
tg.go(tg_test_function, T_FAIL)
tg.go(tg_test_function, T_SUCCESS)
tg.go(tg_test_function, T_FAIL)
tg.wait(10)
self.assertIn("T0 failed", self.failed)
self.assertIn("T2 failed", self.failed)
self.assertIn("T4 failed", self.failed)
self.assertNotIn("T1 failed", self.failed)
self.assertNotIn("T3 failed", self.failed)
def test_exception(self):
tg = self.tg
tg.go(tg_test_function, T_EXC)
tg.wait(10)
self.assertIn("Unhandled exception", self.failed)
self.assertIn("in thread T0", self.failed)
def test_timeout(self):
tg = self.tg
tg.go(tg_test_function, T_SLOW)
tg.wait(0.1)
self.assertEqual(self.failed,
"test did not finish within 0.1 seconds")
def test_thread_unfinished(self):
tg = self.tg
tg.go(tg_test_function, T_SLOW)
tg.go(tg_test_function, T_SLOW, 2)
tg.go(tg_test_function, T_SLOW, wait_time=2)
tg.wait(0.1)
self.assertEqual(self.failed,
"test did not finish within 0.1 seconds\n\n"
"threads did not finish: ['T2']")
T_SUCCESS = 0
T_SLOW = 1
T_EXC = 3
T_FAIL = 4
def tg_test_function(tg, tx, mode=T_SUCCESS, waits=1, wait_time=0.2):
if mode == T_SUCCESS:
return
if mode == T_FAIL:
tg.fail("T%d failed" % tx)
return
if mode == T_EXC:
raise ValueError(str(tx))
assert mode == T_SLOW
while waits:
waits -= 1
if tg.failed():
return
sleep(wait_time)
try:
from types import SimpleNamespace
except ImportError:
# PY2
class SimpleNamespace(object):
def __init__(self, **kw):
self.__dict__.update(kw)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment