matrix 5.4 KB
#!/usr/bin/env python

import sys
import os
import math
import traceback
from time import time

from neo.tests.benchmark import BenchmarkRunner
from ZODB.FileStorage import FileStorage

class MatrixImportBenchmark(BenchmarkRunner):

    error_log = ''

    def add_options(self, parser):
        parser.add_option('-d', '--datafs')
        parser.add_option('', '--min-storages', type='int', default=1)
        parser.add_option('', '--max-storages', type='int', default=2)
        parser.add_option('', '--min-replicas', type='int', default=0)
        parser.add_option('', '--max-replicas', type='int', default=1)
        parser.add_option('', '--threaded', action="store_true")

    def load_options(self, options, args):
        if options.datafs and not os.path.exists(options.datafs):
            sys.exit('Missing or wrong data.fs argument')
        return dict(
            datafs = options.datafs,
            min_s = options.min_storages,
            max_s = options.max_storages,
            min_r = options.min_replicas,
            max_r = options.max_replicas,
            threaded = options.threaded,
        )

    def start(self):
        # build storage (logarithm) & replicas (linear) lists
        min_s, max_s = self._config.min_s, self._config.max_s
        min_r, max_r = self._config.min_r, self._config.max_r
        min_s2 = int(math.log(min_s, 2))
        max_s2 = int(math.log(max_s, 2))
        storages = [2 ** x for x in range(min_s2, max_s2 + 1)]
        if storages[0] < min_s:
            storages[0] = min_s
        if storages[-1] < max_s:
            storages.append(max_s)
        replicas = range(min_r, max_r + 1)
        result_list = [self.runMatrix(storages, replicas)
                       for x in xrange(self._config.repeat)]
        results = {}
        for s in storages:
            results[s] = z = {}
            for r in replicas:
                if r < s:
                    x = [x[s][r] for x in result_list if x[s][r] is not None]
                    if x:
                        z[r] = max(x)
                    else:
                        z[r] = None
        return self.buildReport(storages, replicas, results)

    def runMatrix(self, storages, replicas):
        stats = {}
        for s in storages:
            stats[s] = z = {}
            for r in replicas:
                if r < s:
                    z[r] = self.runImport(1, s, r, 100)
        return stats

    def runImport(self, masters, storages, replicas, partitions):
        datafs = self._config.datafs
        if datafs:
            dfs_storage = FileStorage(file_name=self._config.datafs)
        else:
            datafs = 'PROD1'
            import random, neo.tests.stat_zodb
            dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
                random.Random(0)).as_storage(10000)
        print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
                datafs, masters, storages, replicas, partitions)
        if self._config.threaded:
            from neo.tests.threaded import NEOCluster
        else:
            from neo.tests.functional import NEOCluster
        neo = NEOCluster(
            db_list=['neot_matrix_%d' % i for i in xrange(storages)],
            clear_databases=True,
            master_count=masters,
            partitions=partitions,
            replicas=replicas,
            verbose=self._config.verbose,
        )
        neo.start()
        neo_storage = neo.getZODBStorage()
        # import
        start = time()
        try:
            try:
                neo_storage.copyTransactionsFrom(dfs_storage)
                end = time()
                return dfs_storage.getSize() / ((end - start) * 1e3)
            except:
                traceback.print_exc()
                self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (
                    masters, storages, replicas, partitions)
                self.error_log += "\n%s\n" % ''.join(traceback.format_exc())
                return None
        finally:
            neo.stop()

    def buildReport(self, storages, replicas, results):
        config = self._config
        self.add_status('Min storages', config.min_s)
        self.add_status('Max storages', config.max_s)
        self.add_status('Min replicas', config.min_r)
        self.add_status('Max replicas', config.max_r)
        # draw an array with results
        fmt = '|' + '|'.join(['  %8s  '] * (len(replicas) + 1)) + '|\n'
        sep = '+' + '+'.join(['-' * 12] * (len(replicas) + 1)) + '+\n'
        report = sep
        report += fmt % tuple(['S\R'] + range(0, len(replicas)))
        report += sep
        failures = 0
        speedlist = []
        for s in storages:
            values = []
            assert s in results
            for r in replicas:
                if r in results[s]:
                    if results[s][r] is None:
                        values.append('FAIL')
                        failures += 1
                    else:
                        values.append('%8.1f' % results[s][r])
                        speedlist.append(results[s][r])
                else:
                    values.append('N/A')
            report += fmt % (tuple([s] + values))
            report += sep
        report += self.error_log
        if failures:
            info = '%d failures' % (failures, )
        else:
            info = '%.1f KB/s' % (sum(speedlist) / len(speedlist))
        return info, report

def main(args=None):
    MatrixImportBenchmark().run()

if __name__ == "__main__":
    main()