From b9b47dd3d193983d554809974e60b84c6ff3505a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9gory=20Wisniewski?= <gregory@nexedi.com> Date: Wed, 5 Aug 2009 15:57:14 +0000 Subject: [PATCH] Add a second test for storage replication, factorise the first and add an helper in NeoCluster class. git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1201 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/tests/functional/__init__.py | 10 +++ neo/tests/functional/testStorage.py | 111 +++++++++++++++++++--------- 2 files changed, 86 insertions(+), 35 deletions(-) diff --git a/neo/tests/functional/__init__.py b/neo/tests/functional/__init__.py index 17ff89dd..eed732eb 100644 --- a/neo/tests/functional/__init__.py +++ b/neo/tests/functional/__init__.py @@ -412,6 +412,16 @@ class NEOCluster(object): return uuid is None or uuid == current_try, current_try self.expectCondition(callback, timeout, delay) + def expectNoOudatedCells(self, timeout=0, delay=1): + def callback(last_try): + row_list = self.neoctl.getPartitionRowList()[1] + for row in row_list: + for cell in row[1]: + if cell[1] != protocol.UP_TO_DATE_STATE: + return False, last_try + return True, last_try + self.expectCondition(callback, timeout, delay) + def expectClusterState(self, state, timeout=0, delay=1): def callback(last_try): current_try = self.neoctl.getClusterState() diff --git a/neo/tests/functional/testStorage.py b/neo/tests/functional/testStorage.py index 7aceeb84..98b638d4 100644 --- a/neo/tests/functional/testStorage.py +++ b/neo/tests/functional/testStorage.py @@ -15,18 +15,15 @@ # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. -import os -import time import ZODB import MySQLdb import unittest -import tempfile import transaction -from ZODB.FileStorage import FileStorage from Persistence import Persistent from neo.tests.functional import NEOCluster from neo.client.Storage import Storage as NEOStorage +from neo import protocol class PObject(Persistent): @@ -34,11 +31,12 @@ class PObject(Persistent): self.value = value -class ImportExportTests(unittest.TestCase): +OBJECT_NUMBER = 100 + +class StorageTests(unittest.TestCase): def setUp(self): - self.temp_dir = tempfile.mkdtemp(prefix='neo_import_export_') - print "using the temp directory %s" % self.temp_dir + pass def tearDown(self): pass @@ -47,11 +45,53 @@ class ImportExportTests(unittest.TestCase): db.query(query) result = db.store_result().fetch_row()[0][0] return result + + def populate(self, storage): + db = ZODB.DB(storage=storage) + conn = db.open() + root = conn.root() + for i in xrange(OBJECT_NUMBER): + root[i] = PObject(i) + transaction.commit() + conn.close() + storage.close() + + def getNeoStorage(self, neo): + return NEOStorage(master_nodes=neo.master_nodes, + connector='SocketConnector', + name=neo.cluster_name, + ) + + def checkDatabase(self, neo, db_name): + db = MySQLdb.connect(db=db_name, user='test') + # wait for the sql transaction to be commited + def callback(last_try): + object_number = self.queryCount(db, 'select count(*) from obj') + return object_number == OBJECT_NUMBER + 2, last_try + neo.expectCondition(callback, 0, 1) + # no more temporarily objects + t_objects = self.queryCount(db, 'select count(*) from tobj') + self.assertEqual(t_objects, 0) + # One revision per object and two for the root, before and after + revisions = self.queryCount(db, 'select count(*) from obj') + self.assertEqual(revisions, OBJECT_NUMBER + 2) + # One object more for the root + query = 'select count(*) from (select * from obj group by oid) as t' + objects = self.queryCount(db, query) + self.assertEqual(objects, OBJECT_NUMBER + 1) + + def __checkReplicationDone(self, neo, databases): + # wait for replication to finish + neo.expectNoOudatedCells(timeout=10) + # check databases + for db_name in databases: + self.checkDatabase(neo, db_name) + + # check storages state + self.assertEqual(len(neo.getStorageNodeList(protocol.RUNNING_STATE)), 2) def testReplicationWithoutBreak(self): - OBJECT_NUMBER = 100 - # create a neo cluster databases = ['test_neo1', 'test_neo2'] neo = NEOCluster(databases, port_base=20000, @@ -61,34 +101,35 @@ class ImportExportTests(unittest.TestCase): neo.setupDB() neo.start() - # create a neo storage - args = {'connector': 'SocketConnector', 'name': neo.cluster_name} - storage = NEOStorage(master_nodes=neo.master_nodes, **args) - db = ZODB.DB(storage=storage) - - # populate the cluster - conn = db.open() - root = conn.root() - for i in xrange(OBJECT_NUMBER): - root[i] = PObject(i) - transaction.commit() - conn.close() - storage.close() - - # XXX: replace this sleep by a callback as done in testMaster - time.sleep(1) + # populate the cluster and check + self.populate(self.getNeoStorage(neo)) + self.__checkReplicationDone(neo, databases) - # check databases - for index, db_name in enumerate(databases): - db = MySQLdb.connect(db=db_name, user='test') - # One revision per object and two for the root, before and after - revisions = self.queryCount(db, 'select count(*) from obj') - self.assertEqual(revisions, OBJECT_NUMBER + 2) - # One object more for the root - query = 'select count(*) from (select * from obj group by oid) as t' - objects = self.queryCount(db, query) - self.assertEqual(objects, OBJECT_NUMBER + 1) + def testReplicationWithNewStorage(self): + + # create a neo cluster + databases = ['test_neo1', 'test_neo2'] + neo = NEOCluster(databases, port_base=20000, + master_node_count=2, + partitions=10, replicas=1, + ) + neo.setupDB() + + # populate one storage + new_storage = neo.getStorageProcessList()[-1] + neo.start(except_storages=[new_storage]) + self.populate(self.getNeoStorage(neo)) + + # start the second + new_storage.start() + neo.expectStorageState(new_storage.getUUID(), protocol.PENDING_STATE) + + # add it to the partition table + neo.neoctl.enableStorageList([new_storage.getUUID()]) + neo.expectStorageState(new_storage.getUUID(), protocol.RUNNING_STATE) + # wait for replication to finish then check + self.__checkReplicationDone(neo, databases) if __name__ == "__main__": -- 2.30.9