Commit d4ea398d authored by Julien Muchembled's avatar Julien Muchembled

storage: also commit updated cell TID at each replicated chunk of 'obj' records

This is a follow-up of commit b3dd6973
("Optimize resumption of replication by starting from a greater TID").
I missed the case where a storage node is restarted while it is replicating:
it lost the TID where it was interrupted.

Although we commit after each replicated chunk, to avoid transferring again
all the data from the beginning, it could still waste time to check that
the data are already replicated.
parent 745ee2b2
...@@ -75,9 +75,6 @@ class StorageOperationHandler(EventHandler): ...@@ -75,9 +75,6 @@ class StorageOperationHandler(EventHandler):
deleteTransaction(tid) deleteTransaction(tid)
assert not pack_tid, "TODO" assert not pack_tid, "TODO"
if next_tid: if next_tid:
# More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now.
self.app.dm.commit()
self.app.replicator.fetchTransactions(next_tid) self.app.replicator.fetchTransactions(next_tid)
else: else:
self.app.replicator.fetchObjects() self.app.replicator.fetchObjects()
...@@ -99,7 +96,6 @@ class StorageOperationHandler(EventHandler): ...@@ -99,7 +96,6 @@ class StorageOperationHandler(EventHandler):
deleteObject(oid, serial) deleteObject(oid, serial)
assert not pack_tid, "TODO" assert not pack_tid, "TODO"
if next_tid: if next_tid:
self.app.dm.commit() # like in answerFetchTransactions
# TODO also provide feedback to master about current replication state (tid) # TODO also provide feedback to master about current replication state (tid)
self.app.replicator.fetchObjects(next_tid, next_oid) self.app.replicator.fetchObjects(next_tid, next_oid)
else: else:
......
...@@ -366,6 +366,9 @@ class Replicator(object): ...@@ -366,6 +366,9 @@ class Replicator(object):
offset = self.current_partition offset = self.current_partition
p = self.partition_dict[offset] p = self.partition_dict[offset]
if min_tid: if min_tid:
# More than one chunk ? This could be a full replication so avoid
# restarting from the beginning by committing now.
self.app.dm.commit()
p.next_trans = min_tid p.next_trans = min_tid
else: else:
try: try:
...@@ -390,13 +393,17 @@ class Replicator(object): ...@@ -390,13 +393,17 @@ class Replicator(object):
offset = self.current_partition offset = self.current_partition
p = self.partition_dict[offset] p = self.partition_dict[offset]
max_tid = self.replicate_tid max_tid = self.replicate_tid
dm = self.app.dm
if min_tid: if min_tid:
p.next_obj = min_tid p.next_obj = min_tid
self.updateBackupTID()
dm.updateCellTID(offset, add64(min_tid, -1))
dm.commit() # like in fetchTransactions
else: else:
min_tid = p.next_obj min_tid = p.next_obj
p.next_trans = add64(max_tid, 1) p.next_trans = add64(max_tid, 1)
object_dict = {} object_dict = {}
for serial, oid in self.app.dm.getReplicationObjectList(min_tid, for serial, oid in dm.getReplicationObjectList(min_tid,
max_tid, FETCH_COUNT, offset, min_oid): max_tid, FETCH_COUNT, offset, min_oid):
try: try:
object_dict[serial].append(oid) object_dict[serial].append(oid)
......
...@@ -812,6 +812,48 @@ class ReplicationTests(NEOThreadedTest): ...@@ -812,6 +812,48 @@ class ReplicationTests(NEOThreadedTest):
self.tic() self.tic()
self.assertEqual(2, self.checkBackup(backup)) self.assertEqual(2, self.checkBackup(backup))
@with_cluster(start_cluster=0, replicas=1)
def testStoppingDuringReplication(self, cluster):
"""
When a node is stopped while it is replicating obj from ZERO_TID,
check that replication does not resume from the beginning.
"""
s1, s2 = cluster.storage_list
cluster.start(storage_list=(s1,))
t, c = cluster.getTransaction()
r = c.root()
r._p_changed = 1
t.commit()
ltid = r._p_serial
trans = []
obj = []
with ConnectionFilter() as f, Patch(replicator, FETCH_COUNT=1):
@f.add
def delayReplicate(conn, packet):
if isinstance(packet, Packets.AskFetchTransactions):
trans.append(packet.decode()[2])
elif isinstance(packet, Packets.AskFetchObjects):
if obj:
return True
obj.append(packet.decode()[2])
s2.start()
self.tic()
cluster.neoctl.enableStorageList([s2.uuid])
cluster.neoctl.tweakPartitionTable()
self.tic()
self.assertEqual(trans, [ZERO_TID, ltid])
self.assertEqual(obj, [ZERO_TID])
self.assertPartitionTable(cluster, 'UO')
s2.stop()
cluster.join((s2,))
s2.resetNode()
del trans[:], obj[:]
s2.start()
self.tic()
self.assertEqual(trans, [ltid])
self.assertEqual(obj, [ltid])
self.assertPartitionTable(cluster, 'UU')
@with_cluster(start_cluster=0, replicas=1, partitions=2) @with_cluster(start_cluster=0, replicas=1, partitions=2)
def testReplicationBlockedByUnfinished1(self, cluster, def testReplicationBlockedByUnfinished1(self, cluster,
delay_replication=False): delay_replication=False):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment