Commit aa7b654f authored by Julien Muchembled's avatar Julien Muchembled

qa: add testIncremental (testImporter) test

parent d5834ee9
...@@ -17,11 +17,11 @@ ...@@ -17,11 +17,11 @@
from cPickle import Pickler, Unpickler from cPickle import Pickler, Unpickler
from cStringIO import StringIO from cStringIO import StringIO
from itertools import izip_longest from itertools import izip_longest
import os, random, shutil, time, unittest import os, random, shutil, threading, time, unittest
import transaction, ZODB import transaction, ZODB
from neo.client.exception import NEOPrimaryMasterLost from neo.client.exception import NEOPrimaryMasterLost
from neo.lib import logging from neo.lib import logging
from neo.lib.util import p64, u64 from neo.lib.util import cached_property, p64, u64
from neo.master.transactions import TransactionManager from neo.master.transactions import TransactionManager
from neo.storage.database import getAdapterKlass, importer, manager from neo.storage.database import getAdapterKlass, importer, manager
from neo.storage.database.importer import \ from neo.storage.database.importer import \
...@@ -130,31 +130,51 @@ class ImporterTests(NEOThreadedTest): ...@@ -130,31 +130,51 @@ class ImporterTests(NEOThreadedTest):
self.assertIs(Obj, load()) self.assertIs(Obj, load())
self.assertDictEqual(state, load()) self.assertDictEqual(state, load())
def _importFromFileStorage(self, multi=(), @cached_property
root_filter=None, sub_filter=None): def getFS(self):
import_hash = '1d4ff03730fe6bcbf235e3739fbe5f5b' fs_dir = os.path.join(getTempDirectory(), self.id())
shutil.rmtree(fs_dir, 1) # for --loop
os.mkdir(fs_dir)
def getFS(db='root'):
path = os.path.join(fs_dir, '%s.fs' % db)
return path, {
"storage": "<filestorage>\npath %s\n</filestorage>" % path
}
return getFS
def getData(self, tree=random_tree.generateTree(random.Random(0))):
txn_size = 10 txn_size = 10
tree = random_tree.generateTree(random.Random(0))
i = len(tree) // 3 i = len(tree) // 3
assert i > txn_size assert i > txn_size
before_tree = tree[:i] before_tree = tree[:i]
after_tree = tree[i:] after_tree = tree[i:]
fs_dir = os.path.join(getTempDirectory(), self.id()) def beforeCheck(h, count=52):
shutil.rmtree(fs_dir, 1) # for --loop self.assertEqual(count, h())
os.mkdir(fs_dir) self.assertEqual('1d4ff03730fe6bcbf235e3739fbe5f5b', h.hexdigest())
def finalCheck(r):
h = random_tree.hashTree(r)
self.assertEqual(93, h())
self.assertEqual('6bf0f0cb2d6c1aae9e52c412ef0e25b6', h.hexdigest())
return (
beforeCheck,
lambda r, *f: random_tree.importTree(r, before_tree, txn_size, *f),
finalCheck,
lambda r: random_tree.importTree(r, after_tree, txn_size),
)
def _importFromFileStorage(self, multi=(),
root_filter=None, sub_filter=None):
beforeCheck, before, finalCheck, after = self.getData()
iter_list = [] iter_list = []
db_list = [] db_list = []
# Setup several FileStorage databases. # Setup several FileStorage databases.
for i, db in enumerate(('root',) + multi): for i, db in enumerate(('root',) + multi):
fs_path = os.path.join(fs_dir, '%s.fs' % db) fs_path, cfg = self.getFS(db)
c = ZODB.DB(FileStorage(fs_path)).open() c = ZODB.DB(FileStorage(fs_path)).open()
r = c.root()['tree'] = random_tree.Node() r = c.root()['tree'] = random_tree.Node()
transaction.commit() transaction.commit()
iter_list.append(random_tree.importTree(r, before_tree, txn_size, iter_list.append(before(r, sub_filter(db) if i else root_filter))
sub_filter(db) if i else root_filter)) db_list.append((db, r, cfg))
db_list.append((db, r, {
"storage": "<filestorage>\npath %s\n</filestorage>" % fs_path
}))
# Populate FileStorage databases. # Populate FileStorage databases.
for i, iter_list in enumerate(izip_longest(*iter_list)): for i, iter_list in enumerate(izip_longest(*iter_list)):
for r in iter_list: for r in iter_list:
...@@ -169,9 +189,7 @@ class ImporterTests(NEOThreadedTest): ...@@ -169,9 +189,7 @@ class ImporterTests(NEOThreadedTest):
for x in multi: for x in multi:
cfg['_%s' % x] = str(u64(r[x]._p_oid)) cfg['_%s' % x] = str(u64(r[x]._p_oid))
else: else:
h = random_tree.hashTree(r) beforeCheck(random_tree.hashTree(r))
h()
self.assertEqual(import_hash, h.hexdigest())
importer['writeback'] = 'true' importer['writeback'] = 'true'
else: else:
cfg["oid"] = str(u64(r[db]._p_oid)) cfg["oid"] = str(u64(r[db]._p_oid))
...@@ -216,12 +234,10 @@ class ImporterTests(NEOThreadedTest): ...@@ -216,12 +234,10 @@ class ImporterTests(NEOThreadedTest):
logging.info("start migration") logging.info("start migration")
dm.doOperation(cluster.storage) dm.doOperation(cluster.storage)
# Adjust if needed. Must remain > 0. # Adjust if needed. Must remain > 0.
self.assertEqual(22, h()) beforeCheck(h, 22)
self.assertEqual(import_hash, h.hexdigest())
# New writes after the switch to NEO. # New writes after the switch to NEO.
last_import = -1 last_import = -1
for i, r in enumerate(random_tree.importTree( for i, r in enumerate(after(r)):
r, after_tree, txn_size)):
t.commit() t.commit()
if cluster.storage.dm._import: if cluster.storage.dm._import:
last_import = i last_import = i
...@@ -230,11 +246,6 @@ class ImporterTests(NEOThreadedTest): ...@@ -230,11 +246,6 @@ class ImporterTests(NEOThreadedTest):
assert i < last_import * 3 < 2 * i, (last_import, i) assert i < last_import * 3 < 2 * i, (last_import, i)
self.assertFalse(cluster.storage.dm._import) self.assertFalse(cluster.storage.dm._import)
storage._cache.clear() storage._cache.clear()
def finalCheck(r):
h = random_tree.hashTree(r)
self.assertEqual(93, h())
self.assertEqual('6bf0f0cb2d6c1aae9e52c412ef0e25b6',
h.hexdigest())
finalCheck(r) finalCheck(r)
if dm._writeback: if dm._writeback:
dm.commit() dm.commit()
...@@ -305,5 +316,52 @@ class ImporterTests(NEOThreadedTest): ...@@ -305,5 +316,52 @@ class ImporterTests(NEOThreadedTest):
# merge several DB. # merge several DB.
testMerge = expectedFailure(NEOPrimaryMasterLost)(testMerge) testMerge = expectedFailure(NEOPrimaryMasterLost)(testMerge)
def testIncremental(self):
"""
This reproduces an undocumented way to speed up the import of a single
ZODB by doing most of the work before switching to NEO.
"""
beforeCheck, before, finalCheck, after = self.getData()
fs_path, cfg = self.getFS()
c = ZODB.DB(FileStorage(fs_path)).open()
r = c.root()['tree'] = random_tree.Node()
transaction.commit()
for _ in before(r):
transaction.commit()
c.db().close()
importer = {'zodb': [('root', cfg)]}
# Start NEO cluster with transparent import.
with NEOCluster(importer=importer, partitions=2) as cluster:
s = cluster.storage
l = threading.Lock()
l.acquire()
def _finished(orig):
orig()
l.release()
with Patch(s.dm, _finished=_finished):
cluster.start()
l.acquire()
t, c = cluster.getTransaction()
r = c.root()['tree']
beforeCheck(random_tree.hashTree(r))
c = ZODB.DB(FileStorage(fs_path)).open()
for _ in after(c.root()['tree']):
transaction.commit()
c.db().close()
# TODO: Add a storage option that only does this and exits.
# Such command would also check that there's no data after
# what's already imported.
s.dm.setConfiguration('zodb', None)
s.stop()
cluster.join((s,))
s.resetNode()
with Patch(s.dm, _finished=_finished):
s.start()
self.tic()
l.acquire()
t.begin()
finalCheck(r)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment