Commit 14726406 authored by Leif Walsh's avatar Leif Walsh Committed by Yoni Fogel

stress test runner: better logging, refactoring, and saving prepared environments between test runs

git-svn-id: file:///svn/toku/tokudb@42140 c7de825b-a66e-492c-adef-691d508d4ae1
parent e595d524
...@@ -29,7 +29,6 @@ from shutil import copy, copytree, move, rmtree ...@@ -29,7 +29,6 @@ from shutil import copy, copytree, move, rmtree
from signal import signal, SIGHUP, SIGINT, SIGPIPE, SIGALRM, SIGTERM from signal import signal, SIGHUP, SIGINT, SIGPIPE, SIGALRM, SIGTERM
from subprocess import call, Popen, PIPE, STDOUT from subprocess import call, Popen, PIPE, STDOUT
from tempfile import mkdtemp, mkstemp from tempfile import mkdtemp, mkstemp
from thread import get_ident
from threading import Event, Thread, Timer from threading import Event, Thread, Timer
__version__ = '$Id$' __version__ = '$Id$'
...@@ -53,6 +52,12 @@ def setlimits(): ...@@ -53,6 +52,12 @@ def setlimits():
setrlimit(RLIMIT_CORE, (-1, -1)) setrlimit(RLIMIT_CORE, (-1, -1))
os.nice(7) os.nice(7)
def timestr(timeval):
if timeval == 0:
return 'None'
else:
return time.ctime(timeval)
class TestFailure(Exception): class TestFailure(Exception):
pass pass
...@@ -60,7 +65,7 @@ class Killed(Exception): ...@@ -60,7 +65,7 @@ class Killed(Exception):
pass pass
class TestRunnerBase(object): class TestRunnerBase(object):
def __init__(self, scheduler, tokudb, rev, jemalloc, execf, tsize, csize, test_time, savedir, log): def __init__(self, scheduler, tokudb, rev, jemalloc, execf, tsize, csize, test_time, savedir):
self.scheduler = scheduler self.scheduler = scheduler
self.tokudb = tokudb self.tokudb = tokudb
self.rev = rev self.rev = rev
...@@ -69,8 +74,8 @@ class TestRunnerBase(object): ...@@ -69,8 +74,8 @@ class TestRunnerBase(object):
self.csize = csize self.csize = csize
self.test_time = test_time self.test_time = test_time
self.savedir = savedir self.savedir = savedir
self.env = os.environ
self.env = os.environ
libpath = os.path.join(self.tokudb, 'lib') libpath = os.path.join(self.tokudb, 'lib')
if 'LD_LIBRARY_PATH' in self.env: if 'LD_LIBRARY_PATH' in self.env:
self.env['LD_LIBRARY_PATH'] = '%s:%s' % (libpath, self.env['LD_LIBRARY_PATH']) self.env['LD_LIBRARY_PATH'] = '%s:%s' % (libpath, self.env['LD_LIBRARY_PATH'])
...@@ -84,129 +89,105 @@ class TestRunnerBase(object): ...@@ -84,129 +89,105 @@ class TestRunnerBase(object):
else: else:
self.env['LD_PRELOAD'] = preload self.env['LD_PRELOAD'] = preload
loggername = '%s-%d-%d' % (self.execf, self.tsize, self.csize)
self.logger = logging.getLogger(loggername)
self.logger.propagate = False
self.logger.setLevel(logging.INFO)
logfile = os.path.join(log, loggername)
self.logger.addHandler(logging.FileHandler(logfile))
self.nruns = 0 self.nruns = 0
self.rundir = None self.rundir = None
self.tmplog = None self.outf = None
self.tmplogname = None self.times = [0, 0]
self.phase = 0
self.times = [0, 0, 0]
self.is_large = (tsize >= 10000000) self.is_large = (tsize >= 10000000)
def __str__(self): def __str__(self):
return 'TestRunner<%s, %d, %d>' % (self.execf, self.tsize, self.csize) return 'TestRunner<%s, %d, %d>' % (self.execf, self.tsize, self.csize)
def infostr(self):
return '%(execf)s\t%(rev)s\t%(tsize)d\t%(csize)d\t%(num_ptquery)d\t%(num_update)d\t%(time)d' % self
def __getitem__(self, k): def __getitem__(self, k):
if k == 'time1': return self.__getattribute__(k)
return self.timestr(0)
elif k == 'time2':
return self.timestr(1)
elif k == 'time3':
return self.timestr(2)
else:
return self.__dict__[k]
def run(self): @property
srctests = os.path.join(self.tokudb, 'src', 'tests') def time(self):
self.rundir = mkdtemp(dir=srctests) if self.times[0] != 0 and self.times[1] != 0:
(tmplog, self.tmplogname) = mkstemp() return self.times[1] - self.times[0]
self.tmplog = os.fdopen(tmplog) else:
return 0
@property
def num_ptquery(self):
if self.nruns % 2 < 1: if self.nruns % 2 < 1:
self.ptquery = 1 return 1
else: else:
self.ptquery = randrange(16) return randrange(16)
@property
def num_update(self):
if self.nruns % 4 < 2: if self.nruns % 4 < 2:
self.update = 1 return 1
else: else:
self.update = randrange(16) return randrange(16)
self.envdir = ('../%s-%d-%d-%d-%d-%d.dir' % @property
(self.execf, self.tsize, self.csize, def prepareloc(self):
self.ptquery, self.update, get_ident())) preparename = 'dir.%(execf)s-%(tsize)d-%(csize)d' % self
return os.path.join(self.tokudb, 'src', 'tests', preparename)
def prepare(self):
if os.path.isdir(self.prepareloc):
debug('%s found existing environment.', self)
copytree(self.prepareloc, os.path.join(self.rundir, self.envdir))
else:
debug('%s preparing an environment.', self)
self.run_prepare()
debug('%s copying environment to %s.', self, self.prepareloc)
copytree(os.path.join(self.rundir, self.envdir), self.prepareloc)
def delete_prepared_env(self):
if os.path.isdir(self.prepareloc):
rmtree(self.prepareloc)
def run(self):
srctests = os.path.join(self.tokudb, 'src', 'tests')
self.rundir = mkdtemp(dir=srctests)
self.envdir = '%(execf)s-%(tsize)d-%(csize)d-%(num_ptquery)d-%(num_update)d' % self
try: try:
outname = os.path.join(self.rundir, 'output.txt')
self.outf = open(outname, 'w')
try: try:
self.times[0] = time.time() self.prepare()
debug('%s preparing.', self)
self.setup_test()
self.times[1] = time.time()
debug('%s testing.', self) debug('%s testing.', self)
self.times[0] = time.time()
self.run_test() self.run_test()
self.times[2] = time.time() self.times[1] = time.time()
debug('%s done.', self) debug('%s done.', self)
except Killed: except Killed:
pass pass
except TestFailure: except TestFailure:
savedir = self.save() savedir = self.save()
self.print_failure() self.scheduler.report_failure(self)
warning('Saved environment to %s', savedir) warning('Saved environment to %s', savedir)
else: else:
self.print_success() self.scheduler.report_success(self)
finally: finally:
fullenvdir = os.path.join(self.rundir, self.envdir) self.outf.close()
rmtree(fullenvdir, ignore_errors=True) rmtree(self.rundir)
self.envdir = None
rmtree(self.rundir, ignore_errors=True)
self.rundir = None self.rundir = None
self.tmplog.close() self.times = [0, 0]
self.tmplog = None
if os.path.exists(self.tmplogname):
os.remove(self.tmplogname)
self.tmplogname = None
self.times = [0, 0, 0]
self.nruns += 1 self.nruns += 1
def save(self): def save(self):
savepfx = ('%s-%s-%d-%d-%d-%d-%s-' % savepfx = '%(execf)s-%(rev)s-%(tsize)d-%(csize)d-%(num_ptquery)d-%(num_update)d-%(phase)s' % self
(self.execf, self.rev, self.tsize, self.csize,
self.ptquery, self.update, self.phase))
savedir = mkdtemp(dir=self.savedir, prefix=savepfx) savedir = mkdtemp(dir=self.savedir, prefix=savepfx)
def targetfor(path): def targetfor(path):
return os.path.join(savedir, os.path.basename(path)) return os.path.join(savedir, os.path.basename(path))
if os.path.exists(self.tmplogname):
move(self.tmplogname, targetfor("output.txt")) copytree(self.rundir, targetfor(self.rundir))
for core in glob(os.path.join(self.rundir, "core*")):
move(core, targetfor(core))
if os.path.exists(self.envdir):
copytree(self.envdir, targetfor(self.envdir))
fullexecf = os.path.join(self.tokudb, 'src', 'tests', self.execf) fullexecf = os.path.join(self.tokudb, 'src', 'tests', self.execf)
copy(fullexecf, targetfor(fullexecf)) copy(fullexecf, targetfor(fullexecf))
for lib in glob(os.path.join(self.tokudb, 'lib', '*')): for lib in glob(os.path.join(self.tokudb, 'lib', '*.so')):
copy(lib, targetfor(lib)) copy(lib, targetfor(lib))
return savedir
def timestr(self, phase):
timeval = self.times[phase]
if timeval == 0:
return ''
else:
return time.ctime(timeval)
def infostr(self, result): return savedir
return ('[PASS=%d FAIL=%d] %s %s tsize=%d csize=%d ptquery=%d update=%d' %
(self.scheduler.passed, self.scheduler.failed, result,
self.execf, self.tsize, self.csize, self.ptquery, self.update))
def logstr(self, result):
fmtstr = result + '\t%(execf)s\t%(rev)s\t%(tsize)d\t%(csize)d\t%(ptquery)d\t%(update)d\t%(time1)s\t%(time2)s\t%(time3)s'
return fmtstr % self
def print_success(self):
self.scheduler.passed += 1
self.logger.info(self.logstr('PASSED'))
info(self.infostr('PASSED'))
def print_failure(self):
self.scheduler.failed += 1
self.logger.warning(self.logstr('FAILED'))
warning(self.infostr('FAILED'))
def waitfor(self, proc): def waitfor(self, proc):
while proc.poll() is None: while proc.poll() is None:
...@@ -215,49 +196,52 @@ class TestRunnerBase(object): ...@@ -215,49 +196,52 @@ class TestRunnerBase(object):
os.kill(proc.pid, SIGTERM) os.kill(proc.pid, SIGTERM)
raise Killed() raise Killed()
def defaultargs(self, timed): def spawn_child(self, args):
a = ['-v', proc = Popen([self.execf] + args,
'--envdir', self.envdir,
'--num_elements', str(self.tsize),
'--cachetable_size', str(self.csize)]
if timed:
a += ['--num_seconds', str(self.test_time),
'--no-crash_on_update_failure',
'--num_ptquery_threads', str(self.ptquery),
'--num_update_threads', str(self.update)]
return a
def spawn_child(self, mode, timed):
proc = Popen([self.execf, mode] + self.defaultargs(timed),
executable=os.path.join('..', self.execf), executable=os.path.join('..', self.execf),
env=self.env, env=self.env,
cwd=self.rundir, cwd=self.rundir,
preexec_fn=setlimits, preexec_fn=setlimits,
stdout=self.tmplog, stdout=self.outf,
stderr=STDOUT) stderr=STDOUT)
self.waitfor(proc) self.waitfor(proc)
return proc.returncode return proc.returncode
def prepareargs(self):
return ['-v',
'--envdir', self.envdir,
'--num_elements', str(self.tsize),
'--cachetable_size', str(self.csize)]
def testargs(self):
return ['--num_seconds', str(self.test_time),
'--no-crash_on_update_failure',
'--num_ptquery_threads', str(self.num_ptquery),
'--num_update_threads', str(self.num_update)] + self.prepareargs()
class TestRunner(TestRunnerBase): class TestRunner(TestRunnerBase):
def setup_test(self): def run_prepare(self):
self.phase = "create" self.phase = "create"
if self.spawn_child('--only_create', False) != 0: if self.spawn_child(['--only_create'] + self.prepareargs()) != 0:
raise TestFailure('%s crashed during --only_create.' % self.execf) raise TestFailure('%s crashed during --only_create.' % self.execf)
def run_test(self): def run_test(self):
self.phase = "stress" self.phase = "stress"
if self.spawn_child('--only_stress', True) != 0: if self.spawn_child(['--only_stress'] + self.testargs()) != 0:
raise TestFailure('%s crashed during --only_stress.' % self.execf) raise TestFailure('%s crashed during --only_stress.' % self.execf)
class RecoverTestRunner(TestRunnerBase): class RecoverTestRunner(TestRunnerBase):
def setup_test(self): def run_prepare(self):
self.phase = "test" self.phase = "create"
if self.spawn_child('--test', True) == 0: if self.spawn_child(['--only_create', '--test'] + self.prepareargs()) != 0:
raise TestFailure('%s did not crash during --test' % self.execf) raise TestFailure('%s crashed during --only_create --test.' % self.execf)
def run_test(self): def run_test(self):
self.phase = "test"
if self.spawn_child(['--only_stress', '--test'] + self.testargs()) == 0:
raise TestFailure('%s did not crash during --only_stress --test' % self.execf)
self.phase = "recover" self.phase = "recover"
if self.spawn_child('--recover', False) != 0: if self.spawn_child(['--recover'] + self.prepareargs()) != 0:
raise TestFailure('%s crashed during --recover' % self.execf) raise TestFailure('%s crashed during --recover' % self.execf)
class Worker(Thread): class Worker(Thread):
...@@ -290,18 +274,19 @@ class Worker(Thread): ...@@ -290,18 +274,19 @@ class Worker(Thread):
debug('%s exiting.' % self) debug('%s exiting.' % self)
class Scheduler(Queue): class Scheduler(Queue):
def __init__(self, nworkers, maxlarge): def __init__(self, nworkers, maxlarge, logger):
Queue.__init__(self) Queue.__init__(self)
info('Initializing scheduler with %d jobs.', nworkers) info('Initializing scheduler with %d jobs.', nworkers)
self.nworkers = nworkers self.nworkers = nworkers
self.logger = logger
self.maxlarge = maxlarge
self.nlarge = 0 # not thread safe, don't really care right now
self.passed = 0 self.passed = 0
self.failed = 0 self.failed = 0
self.workers = [] self.workers = []
self.stopping = Event() self.stopping = Event()
self.timer = None self.timer = None
self.error = None self.error = None
self.nlarge = 0 # not thread safe, don't really care right now
self.maxlarge = maxlarge
def run(self, timeout): def run(self, timeout):
info('Starting workers.') info('Starting workers.')
...@@ -338,6 +323,22 @@ class Scheduler(Queue): ...@@ -338,6 +323,22 @@ class Scheduler(Queue):
info('Stopping workers.') info('Stopping workers.')
self.stopping.set() self.stopping.set()
def __getitem__(self, k):
return self.__dict__[k]
def reportstr(self):
return '[PASS=%(passed)d FAIL=%(failed)d]' % self
def report_success(self, runner):
self.passed += 1
self.logger.info('PASSED %s', runner.infostr())
info('%s PASSED %s', self.reportstr(), runner.infostr())
def report_failure(self, runner):
self.failed += 1
self.logger.warning('FAILED %s', runner.infostr())
warning('%s FAILED %s', self.reportstr(), runner.infostr())
def compiler_works(cc): def compiler_works(cc):
try: try:
devnull = open(os.devnull, 'w') devnull = open(os.devnull, 'w')
...@@ -388,26 +389,29 @@ def main(opts): ...@@ -388,26 +389,29 @@ def main(opts):
if opts.build: if opts.build:
rebuild(opts.tokudb, opts.cc) rebuild(opts.tokudb, opts.cc)
rev = revfor(opts.tokudb) rev = revfor(opts.tokudb)
if not os.path.exists(opts.log):
os.mkdir(opts.log)
if not os.path.exists(opts.savedir): if not os.path.exists(opts.savedir):
os.mkdir(opts.savedir) os.mkdir(opts.savedir)
logger = logging.getLogger('stress')
logger.propagate = False
logger.setLevel(logging.INFO)
logger.addHandler(logging.FileHandler(opts.log))
info('Saving pass/fail logs to %s.', opts.log) info('Saving pass/fail logs to %s.', opts.log)
info('Saving failure environments to %s.', opts.savedir) info('Saving failure environments to %s.', opts.savedir)
scheduler = Scheduler(opts.jobs, opts.maxlarge) scheduler = Scheduler(opts.jobs, opts.maxlarge, logger)
runners = [] runners = []
for tsize in [2000, 200000, 50000000]: for tsize in [2000, 200000, 50000000]:
for csize in [50 * tsize, 1000 ** 3]: for csize in [50 * tsize, 1000 ** 3]:
for test in testnames: for test in testnames:
runners.append(TestRunner(scheduler, opts.tokudb, rev, opts.jemalloc, runners.append(TestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
test, tsize, csize, opts.test_time, test, tsize, csize, opts.test_time, opts.savedir))
opts.savedir, opts.log))
for test in recover_testnames: for test in recover_testnames:
runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev, opts.jemalloc, runners.append(RecoverTestRunner(scheduler, opts.tokudb, rev, opts.jemalloc,
test, tsize, csize, opts.test_time, test, tsize, csize, opts.test_time, opts.savedir))
opts.savedir, opts.log))
shuffle(runners) shuffle(runners)
...@@ -420,6 +424,8 @@ def main(opts): ...@@ -420,6 +424,8 @@ def main(opts):
if scheduler.error is not None: if scheduler.error is not None:
error('Scheduler reported an error.') error('Scheduler reported an error.')
raise scheduler.error raise scheduler.error
for runner in runners:
runner.delete_prepared_env()
rebuild(opts.tokudb, opts.cc) rebuild(opts.tokudb, opts.cc)
rev = revfor(opts.tokudb) rev = revfor(opts.tokudb)
for runner in runners: for runner in runners:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment