Commit 18f88b29 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

make test runner throttle the number of large tests concurrently, to prevent excessive swapping

git-svn-id: file:///svn/toku/tokudb@42116 c7de825b-a66e-492c-adef-691d508d4ae1
parent ab191165
#!/usr/bin/python #!/usr/bin/env python
""" """
A script for running our stress tests repeatedly to see if any fail. A script for running our stress tests repeatedly to see if any fail.
...@@ -23,7 +23,7 @@ from glob import glob ...@@ -23,7 +23,7 @@ from glob import glob
from logging import debug, info, warning, error, exception from logging import debug, info, warning, error, exception
from optparse import OptionParser from optparse import OptionParser
from Queue import Queue from Queue import Queue
from random import randrange from random import randrange, shuffle
from resource import setrlimit, RLIMIT_CORE from resource import setrlimit, RLIMIT_CORE
from shutil import copy, copytree, move, rmtree from shutil import copy, copytree, move, rmtree
from signal import signal, SIGHUP, SIGINT, SIGPIPE, SIGALRM, SIGTERM from signal import signal, SIGHUP, SIGINT, SIGPIPE, SIGALRM, SIGTERM
...@@ -60,7 +60,7 @@ class Killed(Exception): ...@@ -60,7 +60,7 @@ class Killed(Exception):
pass pass
class TestRunnerBase(object): class TestRunnerBase(object):
def __init__(self, scheduler, tokudb, rev, execf, tsize, csize, test_time, savedir, log): def __init__(self, scheduler, tokudb, rev, jemalloc, execf, tsize, csize, test_time, savedir, log):
self.scheduler = scheduler self.scheduler = scheduler
self.tokudb = tokudb self.tokudb = tokudb
self.rev = rev self.rev = rev
...@@ -70,8 +70,19 @@ class TestRunnerBase(object): ...@@ -70,8 +70,19 @@ class TestRunnerBase(object):
self.test_time = test_time self.test_time = test_time
self.savedir = savedir self.savedir = savedir
self.env = os.environ self.env = os.environ
self.env['LD_LIBRARY_PATH'] = '%s:%s' % (os.path.join(self.tokudb, 'lib'),
self.env['LD_LIBRARY_PATH']) libpath = os.path.join(self.tokudb, 'lib')
if 'LD_LIBRARY_PATH' in self.env:
self.env['LD_LIBRARY_PATH'] = '%s:%s' % (libpath, self.env['LD_LIBRARY_PATH'])
else:
self.env['LD_LIBRARY_PATH'] = libpath
if jemalloc is not None and len(jemalloc) > 0:
preload = os.path.normpath(jemalloc)
if 'LD_PRELOAD' in self.env:
self.env['LD_PRELOAD'] = '%s:%s' % (preload, self.env['LD_PRELOAD'])
else:
self.env['LD_PRELOAD'] = preload
loggername = '%s-%d-%d' % (self.execf, self.tsize, self.csize) loggername = '%s-%d-%d' % (self.execf, self.tsize, self.csize)
self.logger = logging.getLogger(loggername) self.logger = logging.getLogger(loggername)
...@@ -86,9 +97,10 @@ class TestRunnerBase(object): ...@@ -86,9 +97,10 @@ class TestRunnerBase(object):
self.tmplogname = None self.tmplogname = None
self.phase = 0 self.phase = 0
self.times = [0, 0, 0] self.times = [0, 0, 0]
self.is_large = (tsize >= 10000000)
def __str__(self): def __str__(self):
return '%s(tsize=%d, csize=%d)' % (self.execf, self.tsize, self.csize) return 'TestRunner<%s, %d, %d>' % (self.execf, self.tsize, self.csize)
def run(self): def run(self):
srctests = os.path.join(self.tokudb, 'src', 'tests') srctests = os.path.join(self.tokudb, 'src', 'tests')
...@@ -113,10 +125,13 @@ class TestRunnerBase(object): ...@@ -113,10 +125,13 @@ class TestRunnerBase(object):
try: try:
try: try:
self.times[0] = time.time() self.times[0] = time.time()
debug('%s preparing.', self)
self.setup_test() self.setup_test()
self.times[1] = time.time() self.times[1] = time.time()
debug('%s testing.', self)
self.run_test() self.run_test()
self.times[2] = time.time() self.times[2] = time.time()
debug('%s done.', self)
except Killed: except Killed:
pass pass
except TestFailure: except TestFailure:
...@@ -256,6 +271,13 @@ class Worker(Thread): ...@@ -256,6 +271,13 @@ class Worker(Thread):
debug('%s starting.' % self) debug('%s starting.' % self)
while not self.scheduler.stopping.isSet(): while not self.scheduler.stopping.isSet():
test_runner = self.scheduler.get() test_runner = self.scheduler.get()
if test_runner.is_large:
if self.scheduler.nlarge + 1 > self.scheduler.maxlarge:
debug('%s pulled a large test, but there are already %d running. Putting it back.',
self, self.scheduler.nlarge)
self.scheduler.put(test_runner)
continue
self.scheduler.nlarge += 1
try: try:
test_runner.run() test_runner.run()
except Exception, e: except Exception, e:
...@@ -263,12 +285,14 @@ class Worker(Thread): ...@@ -263,12 +285,14 @@ class Worker(Thread):
info('Killing all workers.') info('Killing all workers.')
self.scheduler.error = e self.scheduler.error = e
self.scheduler.stop() self.scheduler.stop()
if test_runner.is_large:
self.scheduler.nlarge -= 1
if not self.scheduler.stopping.isSet(): if not self.scheduler.stopping.isSet():
self.scheduler.put(test_runner) self.scheduler.put(test_runner)
debug('%s exiting.' % self) debug('%s exiting.' % self)
class Scheduler(Queue): class Scheduler(Queue):
def __init__(self, nworkers): def __init__(self, nworkers, maxlarge):
Queue.__init__(self) Queue.__init__(self)
info('Initializing scheduler with %d jobs.', nworkers) info('Initializing scheduler with %d jobs.', nworkers)
self.nworkers = nworkers self.nworkers = nworkers
...@@ -278,6 +302,8 @@ class Scheduler(Queue): ...@@ -278,6 +302,8 @@ class Scheduler(Queue):
self.stopping = Event() self.stopping = Event()
self.timer = None self.timer = None
self.error = None self.error = None
self.nlarge = 0 # not thread safe, don't really care right now
self.maxlarge = maxlarge
def run(self, timeout): def run(self, timeout):
info('Starting workers.') info('Starting workers.')
...@@ -371,20 +397,22 @@ def main(opts): ...@@ -371,20 +397,22 @@ def main(opts):
info('Saving pass/fail logs to %s.', opts.log) info('Saving pass/fail logs to %s.', opts.log)
info('Saving failure environments to %s.', opts.savedir) info('Saving failure environments to %s.', opts.savedir)
scheduler = Scheduler(opts.jobs) scheduler = Scheduler(opts.jobs, opts.maxlarge)
runners = [] runners = []
for tsize in [2000, 200000, 50000000]: for tsize in [2000, 200000, 50000000]:
for csize in [50 * tsize, 1000 ** 3]: for csize in [50 * tsize, 1000 ** 3]:
for test in testnames: for test in testnames:
runners.append(TestRunner(scheduler, opts.tokudb, rev, runners.append(TestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
test, tsize, csize, opts.test_time, test, tsize, csize, opts.test_time,
opts.savedir, opts.log)) opts.savedir, opts.log))
for test in recover_testnames: for test in recover_testnames:
runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev, runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
test, tsize, csize, opts.test_time, test, tsize, csize, opts.test_time,
opts.savedir, opts.log)) opts.savedir, opts.log))
shuffle(runners)
for runner in runners: for runner in runners:
scheduler.put(runner) scheduler.put(runner)
...@@ -476,9 +504,10 @@ if __name__ == '__main__': ...@@ -476,9 +504,10 @@ if __name__ == '__main__':
parser.add_option('-t', '--test_time', type='int', dest='test_time', parser.add_option('-t', '--test_time', type='int', dest='test_time',
default=600, default=600,
help='time to run each test, in seconds [default=600]'), help='time to run each test, in seconds [default=600]'),
parser.add_option('-j', '--jobs', type='int', dest='jobs', parser.add_option('-j', '--jobs', type='int', dest='jobs', default=8,
default=8,
help='how many concurrent tests to run [default=8]') help='how many concurrent tests to run [default=8]')
parser.add_option('--maxlarge', type='int', dest='maxlarge', default=2,
help='maximum number of large tests to run concurrently (helps prevent swapping) [default=2]')
parser.add_option('--no-build', action='store_false', dest='build', default=True, parser.add_option('--no-build', action='store_false', dest='build', default=True,
help="don't build before testing [default=do build]") help="don't build before testing [default=do build]")
parser.add_option('--rebuild_period', type='int', dest='rebuild_period', parser.add_option('--rebuild_period', type='int', dest='rebuild_period',
...@@ -486,6 +515,8 @@ if __name__ == '__main__': ...@@ -486,6 +515,8 @@ if __name__ == '__main__':
help='how many seconds between svn up and rebuild, 0 means never rebuild [default=24 hours]') help='how many seconds between svn up and rebuild, 0 means never rebuild [default=24 hours]')
parser.add_option('--cc', type='string', dest='cc', default='icc', parser.add_option('--cc', type='string', dest='cc', default='icc',
help='which compiler to use [default=icc]') help='which compiler to use [default=icc]')
parser.add_option('--jemalloc', type='string', dest='jemalloc',
help='a libjemalloc.so to put in LD_PRELOAD when running tests')
(opts, args) = parser.parse_args() (opts, args) = parser.parse_args()
if len(args) > 0: if len(args) > 0:
parser.print_usage() parser.print_usage()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment