1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
#!/usr/bin/env python
import sys
import os
import math
import traceback
from time import time
from neo.tests.benchmark import BenchmarkRunner
from ZODB.FileStorage import FileStorage
class MatrixImportBenchmark(BenchmarkRunner):
error_log = ''
def add_options(self, parser):
parser.add_option('-d', '--datafs')
parser.add_option('', '--min-storages', type='int', default=1)
parser.add_option('', '--max-storages', type='int', default=2)
parser.add_option('', '--min-replicas', type='int', default=0)
parser.add_option('', '--max-replicas', type='int', default=1)
parser.add_option('', '--threaded', action="store_true")
def load_options(self, options, args):
if options.datafs and not os.path.exists(options.datafs):
sys.exit('Missing or wrong data.fs argument')
return dict(
datafs = options.datafs,
min_s = options.min_storages,
max_s = options.max_storages,
min_r = options.min_replicas,
max_r = options.max_replicas,
threaded = options.threaded,
)
def start(self):
# build storage (logarithm) & replicas (linear) lists
min_s, max_s = self._config.min_s, self._config.max_s
min_r, max_r = self._config.min_r, self._config.max_r
min_s2 = int(math.log(min_s, 2))
max_s2 = int(math.log(max_s, 2))
storages = [2 ** x for x in range(min_s2, max_s2 + 1)]
if storages[0] < min_s:
storages[0] = min_s
if storages[-1] < max_s:
storages.append(max_s)
replicas = range(min_r, max_r + 1)
result_list = [self.runMatrix(storages, replicas)
for x in xrange(self._config.repeat)]
results = {}
for s in storages:
results[s] = z = {}
for r in replicas:
if r < s:
x = [x[s][r] for x in result_list if x[s][r] is not None]
if x:
z[r] = max(x)
else:
z[r] = None
return self.buildReport(storages, replicas, results)
def runMatrix(self, storages, replicas):
stats = {}
for s in storages:
stats[s] = z = {}
for r in replicas:
if r < s:
z[r] = self.runImport(1, s, r, 100)
return stats
def runImport(self, masters, storages, replicas, partitions):
datafs = self._config.datafs
if datafs:
dfs_storage = FileStorage(file_name=self._config.datafs)
else:
datafs = 'PROD1'
import random, neo.tests.stat_zodb
dfs_storage = getattr(neo.tests.stat_zodb, datafs)(
random.Random(0)).as_storage(10000)
print "Import of %s with m=%s, s=%s, r=%s, p=%s" % (
datafs, masters, storages, replicas, partitions)
if self._config.threaded:
from neo.tests.threaded import NEOCluster
else:
from neo.tests.functional import NEOCluster
neo = NEOCluster(
db_list=['neot_matrix_%d' % i for i in xrange(storages)],
clear_databases=True,
master_count=masters,
partitions=partitions,
replicas=replicas,
verbose=self._config.verbose,
)
neo.start()
neo_storage = neo.getZODBStorage()
# import
start = time()
try:
try:
neo_storage.copyTransactionsFrom(dfs_storage)
end = time()
return dfs_storage.getSize() / ((end - start) * 1e3)
except:
traceback.print_exc()
self.error_log += "Import with m=%s, s=%s, r=%s, p=%s:" % (
masters, storages, replicas, partitions)
self.error_log += "\n%s\n" % ''.join(traceback.format_exc())
return None
finally:
neo.stop()
def buildReport(self, storages, replicas, results):
config = self._config
self.add_status('Min storages', config.min_s)
self.add_status('Max storages', config.max_s)
self.add_status('Min replicas', config.min_r)
self.add_status('Max replicas', config.max_r)
# draw an array with results
fmt = '|' + '|'.join([' %8s '] * (len(replicas) + 1)) + '|\n'
sep = '+' + '+'.join(['-' * 12] * (len(replicas) + 1)) + '+\n'
report = sep
report += fmt % tuple(['S\R'] + range(0, len(replicas)))
report += sep
failures = 0
speedlist = []
for s in storages:
values = []
assert s in results
for r in replicas:
if r in results[s]:
if results[s][r] is None:
values.append('FAIL')
failures += 1
else:
values.append('%8.1f' % results[s][r])
speedlist.append(results[s][r])
else:
values.append('N/A')
report += fmt % (tuple([s] + values))
report += sep
report += self.error_log
if failures:
info = '%d failures' % (failures, )
else:
info = '%.1f KB/s' % (sum(speedlist) / len(speedlist))
return info, report
def main(args=None):
MatrixImportBenchmark().run()
if __name__ == "__main__":
main()