Commit a9634557 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

make test runner throttle the number of large tests concurrently, to prevent excessive swapping

git-svn-id: file:///svn/toku/tokudb@42116 c7de825b-a66e-492c-adef-691d508d4ae1
parent 5b3fd4bb
#!/usr/bin/python
#!/usr/bin/env python
"""
A script for running our stress tests repeatedly to see if any fail.
......@@ -23,7 +23,7 @@ from glob import glob
from logging import debug, info, warning, error, exception
from optparse import OptionParser
from Queue import Queue
from random import randrange
from random import randrange, shuffle
from resource import setrlimit, RLIMIT_CORE
from shutil import copy, copytree, move, rmtree
from signal import signal, SIGHUP, SIGINT, SIGPIPE, SIGALRM, SIGTERM
......@@ -60,7 +60,7 @@ class Killed(Exception):
pass
class TestRunnerBase(object):
def __init__(self, scheduler, tokudb, rev, execf, tsize, csize, test_time, savedir, log):
def __init__(self, scheduler, tokudb, rev, jemalloc, execf, tsize, csize, test_time, savedir, log):
self.scheduler = scheduler
self.tokudb = tokudb
self.rev = rev
......@@ -70,8 +70,19 @@ class TestRunnerBase(object):
self.test_time = test_time
self.savedir = savedir
self.env = os.environ
self.env['LD_LIBRARY_PATH'] = '%s:%s' % (os.path.join(self.tokudb, 'lib'),
self.env['LD_LIBRARY_PATH'])
libpath = os.path.join(self.tokudb, 'lib')
if 'LD_LIBRARY_PATH' in self.env:
self.env['LD_LIBRARY_PATH'] = '%s:%s' % (libpath, self.env['LD_LIBRARY_PATH'])
else:
self.env['LD_LIBRARY_PATH'] = libpath
if jemalloc is not None and len(jemalloc) > 0:
preload = os.path.normpath(jemalloc)
if 'LD_PRELOAD' in self.env:
self.env['LD_PRELOAD'] = '%s:%s' % (preload, self.env['LD_PRELOAD'])
else:
self.env['LD_PRELOAD'] = preload
loggername = '%s-%d-%d' % (self.execf, self.tsize, self.csize)
self.logger = logging.getLogger(loggername)
......@@ -86,9 +97,10 @@ class TestRunnerBase(object):
self.tmplogname = None
self.phase = 0
self.times = [0, 0, 0]
self.is_large = (tsize >= 10000000)
def __str__(self):
return '%s(tsize=%d, csize=%d)' % (self.execf, self.tsize, self.csize)
return 'TestRunner<%s, %d, %d>' % (self.execf, self.tsize, self.csize)
def run(self):
srctests = os.path.join(self.tokudb, 'src', 'tests')
......@@ -113,10 +125,13 @@ class TestRunnerBase(object):
try:
try:
self.times[0] = time.time()
debug('%s preparing.', self)
self.setup_test()
self.times[1] = time.time()
debug('%s testing.', self)
self.run_test()
self.times[2] = time.time()
debug('%s done.', self)
except Killed:
pass
except TestFailure:
......@@ -256,6 +271,13 @@ class Worker(Thread):
debug('%s starting.' % self)
while not self.scheduler.stopping.isSet():
test_runner = self.scheduler.get()
if test_runner.is_large:
if self.scheduler.nlarge + 1 > self.scheduler.maxlarge:
debug('%s pulled a large test, but there are already %d running. Putting it back.',
self, self.scheduler.nlarge)
self.scheduler.put(test_runner)
continue
self.scheduler.nlarge += 1
try:
test_runner.run()
except Exception, e:
......@@ -263,12 +285,14 @@ class Worker(Thread):
info('Killing all workers.')
self.scheduler.error = e
self.scheduler.stop()
if test_runner.is_large:
self.scheduler.nlarge -= 1
if not self.scheduler.stopping.isSet():
self.scheduler.put(test_runner)
debug('%s exiting.' % self)
class Scheduler(Queue):
def __init__(self, nworkers):
def __init__(self, nworkers, maxlarge):
Queue.__init__(self)
info('Initializing scheduler with %d jobs.', nworkers)
self.nworkers = nworkers
......@@ -278,6 +302,8 @@ class Scheduler(Queue):
self.stopping = Event()
self.timer = None
self.error = None
self.nlarge = 0 # not thread safe, don't really care right now
self.maxlarge = maxlarge
def run(self, timeout):
info('Starting workers.')
......@@ -371,20 +397,22 @@ def main(opts):
info('Saving pass/fail logs to %s.', opts.log)
info('Saving failure environments to %s.', opts.savedir)
scheduler = Scheduler(opts.jobs)
scheduler = Scheduler(opts.jobs, opts.maxlarge)
runners = []
for tsize in [2000, 200000, 50000000]:
for csize in [50 * tsize, 1000 ** 3]:
for test in testnames:
runners.append(TestRunner(scheduler, opts.tokudb, rev,
runners.append(TestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
test, tsize, csize, opts.test_time,
opts.savedir, opts.log))
for test in recover_testnames:
runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev,
runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
test, tsize, csize, opts.test_time,
opts.savedir, opts.log))
shuffle(runners)
for runner in runners:
scheduler.put(runner)
......@@ -476,9 +504,10 @@ if __name__ == '__main__':
parser.add_option('-t', '--test_time', type='int', dest='test_time',
default=600,
help='time to run each test, in seconds [default=600]'),
parser.add_option('-j', '--jobs', type='int', dest='jobs',
default=8,
parser.add_option('-j', '--jobs', type='int', dest='jobs', default=8,
help='how many concurrent tests to run [default=8]')
parser.add_option('--maxlarge', type='int', dest='maxlarge', default=2,
help='maximum number of large tests to run concurrently (helps prevent swapping) [default=2]')
parser.add_option('--no-build', action='store_false', dest='build', default=True,
help="don't build before testing [default=do build]")
parser.add_option('--rebuild_period', type='int', dest='rebuild_period',
......@@ -486,6 +515,8 @@ if __name__ == '__main__':
help='how many seconds between svn up and rebuild, 0 means never rebuild [default=24 hours]')
parser.add_option('--cc', type='string', dest='cc', default='icc',
help='which compiler to use [default=icc]')
parser.add_option('--jemalloc', type='string', dest='jemalloc',
help='a libjemalloc.so to put in LD_PRELOAD when running tests')
(opts, args) = parser.parse_args()
if len(args) > 0:
parser.print_usage()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment