Commit 81e7018a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent eceb8b9e
......@@ -918,7 +918,6 @@ class LockInformation(Packet):
PTID('ttid'),
)
# NOTE
class InvalidateObjects(Packet):
"""
Invalidate objects. PM -> C.
......
......@@ -691,6 +691,29 @@ class NEOCluster(object):
return admin
###
# A few handy shortcuts for tests
@property
def ptid(self):
return self.neoctl.getRecovery()[0]
@property
def backup_tid(self):
return self.neoctl.getRecovery()[1]
@property
def truncated_tid(self):
return self.neoctl.getRecovery()[2]
@property
def last_tid(self):
return self.primary_master.getLastTransaction()
@property
def cluster_state(self):
return self.neoctl.getClusterState()
###
@property
def primary_master(self):
master, = [master for master in self.master_list if master.primary]
......
......@@ -53,7 +53,9 @@ def backup_test(partitions=1, upstream_kw={}, backup_kw={}):
return decorator
"""
# handy tool to get various ids of a cluster in tests
# XXX move to NEOCluster ?
class IDs:
def __init__(self, cluster):
......@@ -82,6 +84,7 @@ class IDs:
@property
def cluster_state(self):
return self.cluster.neoctl.getClusterState()
"""
......@@ -97,6 +100,7 @@ class ReplicationTests(NEOThreadedTest):
self.assertEqual(self.checksumPartition(source, partition, **kw),
self.checksumPartition(destination, partition, **kw))
# XXX add checks for cluster_state, .backup_tid & .last_tid to here ?
def checkBackup(self, cluster, **kw):
upstream_pt = cluster.upstream.primary_master.pt
pt = cluster.primary_master.pt
......@@ -125,9 +129,6 @@ class ReplicationTests(NEOThreadedTest):
importZODB(3)
backup = NEOCluster(partitions=np, replicas=nr-1, storage_count=5,
upstream=upstream)
U = IDs(upstream)
B = IDs(backup)
# U -> B propagation
try:
backup.start()
......@@ -138,8 +139,8 @@ class ReplicationTests(NEOThreadedTest):
# Normal case, following upstream cluster closely.
importZODB(17)
self.tic()
self.assertEqual(B.backup_tid, U.last_tid)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(backup.backup_tid, upstream.last_tid)
self.assertEqual(backup.last_tid, upstream.last_tid)
self.assertEqual(np*nr, self.checkBackup(backup))
# Check that a backup cluster can be restarted.
......@@ -153,16 +154,16 @@ class ReplicationTests(NEOThreadedTest):
ClusterStates.BACKINGUP)
importZODB(17)
self.tic()
self.assertEqual(B.backup_tid, U.last_tid)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(backup.backup_tid, upstream.last_tid)
self.assertEqual(backup.last_tid, upstream.last_tid)
self.assertEqual(np*nr, self.checkBackup(backup))
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic()
# Stop backing up, nothing truncated.
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic()
self.assertEqual(B.backup_tid, None)
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(backup.backup_tid, None)
self.assertEqual(backup.last_tid, upstream.last_tid)
self.assertEqual(np*nr, self.checkBackup(backup))
self.assertEqual(backup.neoctl.getClusterState(),
ClusterStates.RUNNING)
......@@ -178,41 +179,40 @@ class ReplicationTests(NEOThreadedTest):
def delaySecondary(conn, packet):
if isinstance(packet, Packets.Replicate):
tid, upstream_name, source_dict = packet.decode()
print 'REPLICATE tid: %r, upstream_name: %r, source_dict: %r' % \
(tid, upstream_name, source_dict)
#print 'REPLICATE tid: %r, upstream_name: %r, source_dict: %r' % \
# (tid, upstream_name, source_dict)
#return True
return not upstream_name and all(source_dict.itervalues())
#return upstream_name != ""
#return not upstream_name and all(source_dict.itervalues())
return upstream_name != ""
backup.reset()
try:
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
u_last_tid0 = U.last_tid
self.assertEqual(B.backup_tid, u_last_tid0)
self.assertEqual(B.last_tid, u_last_tid0)
u_last_tid0 = upstream.last_tid
self.assertEqual(backup.backup_tid, u_last_tid0)
self.assertEqual(backup.last_tid, u_last_tid0)
with backup.master.filterConnection(*backup.storage_list) as f:
f.add(delaySecondary)
while not f.filtered_count:
importZODB(1)
self.tic()
# there were new commits
self.assertGreater(U.last_tid, u_last_tid0)
self.assertGreater(upstream.last_tid, u_last_tid0)
# is not updated for data (but can be pre-updated to tid-1 on first synced txn) XXX text
self.assertLess(B.backup_tid, U.last_tid)
self.assertLess(backup.backup_tid, upstream.last_tid)
# info about last_tid is synced fully
self.assertEqual(B.last_tid, U.last_tid)
self.assertEqual(backup.last_tid, upstream.last_tid)
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic()
self.assertEqual(B.cluster_state, ClusterStates.RECOVERING)
self.assertEqual(B.backup_tid, None)
self.assertEqual(B.last_tid, U.last_tid) # not-yet truncated
self.assertEqual(backup.cluster_state, ClusterStates.RECOVERING)
self.assertEqual(backup.backup_tid, None)
self.assertEqual(backup.last_tid, upstream.last_tid) # not-yet truncated
self.tic()
self.assertEqual(B.cluster_state, ClusterStates.RUNNING)
self.assertEqual(B.backup_tid, None)
self.assertEqual(B.last_tid, u_last_tid0) # truncated after recovery
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
self.assertEqual(backup.cluster_state, ClusterStates.RUNNING)
self.assertEqual(backup.backup_tid, None)
self.assertEqual(backup.last_tid, u_last_tid0) # truncated after recovery
self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid))
finally:
backup.stop()
dbmanager.X = 0
......@@ -224,20 +224,25 @@ class ReplicationTests(NEOThreadedTest):
backup.start()
backup.neoctl.setClusterState(ClusterStates.STARTING_BACKUP)
self.tic()
self.assertEqual(backup.cluster_state, ClusterStates.BACKINGUP)
self.assertEqual(backup.backup_tid, upstream.last_tid) # B caught-up with U
u_last_tid1 = upstream.last_tid
with ConnectionFilter() as f:
f.add(lambda conn, packet: conn.getUUID() is None and
isinstance(packet, Packets.AddObject))
while not f.filtered_count:
importZODB(1)
self.tic()
# TODO assert B.last_tid = U.last_tid
# assert B.backup_tid < U.last_tid
self.assertEqual(backup.last_tid, upstream.last_tid)
self.assertLess(backup.backup_tid, upstream.last_tid)
backup.neoctl.setClusterState(ClusterStates.STOPPING_BACKUP)
self.tic()
# TODO assert B.last_tid = B^.backup_tid ( < U.last_tid )
self.assertEqual(backup.last_tid, u_last_tid1) # = B^.backup_tid
self.tic()
self.assertEqual(np*nr, self.checkBackup(backup,
max_tid=backup.master.getLastTransaction()))
self.assertEqual(backup.cluster_state, ClusterStates.RUNNING)
self.assertEqual(backup.backup_tid, None)
self.assertEqual(backup.last_tid, u_last_tid1) # truncated after recovery
self.assertEqual(np*nr, self.checkBackup(backup, max_tid=backup.last_tid))
finally:
backup.stop()
finally:
......@@ -258,14 +263,14 @@ class ReplicationTests(NEOThreadedTest):
if min_tid is None:
counts[0] += 1
if counts[0] > 1:
orig.im_self.app.master_conn.close()
orig.im_self.app.master_conn.close() # disconnect Sb from Mb
return orig(min_tid, min_oid)
def onTransactionCommitted(orig, txn):
counts[0] += 1
if counts[0] > 1:
node_list = orig.im_self.nm.getClientList(only_identified=True)
node_list.remove(txn.getNode())
node_list[0].getConnection().close()
node_list[0].getConnection().close() # disconnect Mb from M
return orig(txn)
upstream = NEOCluster(partitions=np, replicas=0, storage_count=1)
try:
......@@ -295,7 +300,7 @@ class ReplicationTests(NEOThreadedTest):
if event % 2:
storage = slave(primary_dict).pop()
else:
storage, partition_list = primary_dict.popitem()
storage, _ = primary_dict.popitem()
# Populate until the found storage performs
# a second replication partially and aborts.
p = Patch(backup.storage_list[storage].replicator,
......@@ -305,6 +310,9 @@ class ReplicationTests(NEOThreadedTest):
if event > 5:
backup.neoctl.checkReplicas(check_dict, ZERO_TID, None)
self.tic()
self.assertEqual(backup.cluster_state, ClusterStates.BACKINGUP)
self.assertEqual(backup.backup_tid, upstream.last_tid)
self.assertEqual(backup.last_tid, upstream.last_tid)
self.assertEqual(np*3, self.checkBackup(backup))
finally:
backup.stop()
......@@ -345,7 +353,7 @@ class ReplicationTests(NEOThreadedTest):
upstream = backup.upstream
with ConnectionFilter() as f:
f.add(lambda conn, packet:
isinstance(packet, Packets.InvalidateObjects))
isinstance(packet, Packets.InvalidateObjects)) # delay M -> Mb
upstream.importZODB()(1)
count = [0]
def _connect(orig, conn):
......@@ -379,6 +387,7 @@ class ReplicationTests(NEOThreadedTest):
upstream.importZODB()(1)
self.tic()
self.tic()
# TODO check tids
self.assertEqual(1, self.checkBackup(backup))
def testReplicationAbortedBySource(self):
......@@ -387,7 +396,7 @@ class ReplicationTests(NEOThreadedTest):
dropped, and that the out-of-date node finishes to replicate from
another source.
Here are the different states of partitions over time:
pt: 0: U|U|U
pt: 0: U..|U..|U..
pt: 0: UO.|U.O|FOO
pt: 0: UU.|U.O|FOO
pt: 0: UU.|U.U|FOO # nodes 1 & 2 replicate from node 0
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment