Commit 6b1f198f authored by Julien Muchembled's avatar Julien Muchembled

storage: always restart replication of outdated cells from the beginning (ZERO_TID)

This is a workaround to fix holes if replication is interrupted after new data
is committed.
parent 949f7e0f
...@@ -86,6 +86,9 @@ ...@@ -86,6 +86,9 @@
not degrade performance for client nodes. But when there's only 1 storage not degrade performance for client nodes. But when there's only 1 storage
left for a partition, it may be wanted to guarantee a minimum speed to left for a partition, it may be wanted to guarantee a minimum speed to
avoid complete data loss if another failure happens too early. avoid complete data loss if another failure happens too early.
- Find a way not to always start replication from the beginning. Currently,
a temporarily down nodes can't replicate from where it was interrupted,
which is an issue on big databases. (SPEED)
- Pack segmentation & throttling (HIGH AVAILABILITY) - Pack segmentation & throttling (HIGH AVAILABILITY)
In its current implementation, pack runs in one call on all storage nodes In its current implementation, pack runs in one call on all storage nodes
at the same time, which lcoks down the whole cluster. This task should at the same time, which lcoks down the whole cluster. This task should
......
...@@ -233,6 +233,12 @@ class DatabaseManager(object): ...@@ -233,6 +233,12 @@ class DatabaseManager(object):
tid = max(tid, max(obj.itervalues())) tid = max(tid, max(obj.itervalues()))
else: else:
tid = max(obj.itervalues()) if obj else None tid = max(obj.itervalues()) if obj else None
# TODO: Replication can't be resumed from the tids in 'trans' and 'obj'
# because outdated cells are writable and may contain recently
# committed data. We must save somewhere where replication was
# interrupted and return this information. For the moment, we
# tell the replicator to resume from the beginning.
trans = obj = {}
return tid, trans, obj, oid return tid, trans, obj, oid
def getUnfinishedTIDList(self): def getUnfinishedTIDList(self):
......
...@@ -122,24 +122,6 @@ class StorageDBTests(NeoUnitTestBase): ...@@ -122,24 +122,6 @@ class StorageDBTests(NeoUnitTestBase):
def checkSet(self, list1, list2): def checkSet(self, list1, list2):
self.assertEqual(set(list1), set(list2)) self.assertEqual(set(list1), set(list2))
def test_getLastIDs(self):
tid1, tid2, tid3, tid4 = self.getTIDs(4)
oid1, oid2 = self.getOIDs(2)
txn, objs = self.getTransaction([oid1, oid2])
self.db.storeTransaction(tid1, objs, txn, False)
self.db.storeTransaction(tid2, objs, txn, False)
self.assertEqual(self.db.getLastIDs(),
(tid2, {0: tid2}, {0: tid2}, oid2))
self.db.storeTransaction(tid3, objs, txn)
tids = {0: tid2, None: tid3}
self.assertEqual(self.db.getLastIDs(), (tid3, tids, tids, oid2))
self.db.storeTransaction(tid4, objs, None)
self.assertEqual(self.db.getLastIDs(),
(tid4, tids, {0: tid2, None: tid4}, oid2))
self.db.finishTransaction(tid3)
self.assertEqual(self.db.getLastIDs(),
(tid4, {0: tid3}, {0: tid3, None: tid4}, oid2))
def test_getUnfinishedTIDList(self): def test_getUnfinishedTIDList(self):
tid1, tid2, tid3, tid4 = self.getTIDs(4) tid1, tid2, tid3, tid4 = self.getTIDs(4)
oid1, oid2 = self.getOIDs(2) oid1, oid2 = self.getOIDs(2)
......
...@@ -21,6 +21,7 @@ from collections import defaultdict ...@@ -21,6 +21,7 @@ from collections import defaultdict
from functools import wraps from functools import wraps
from neo.lib import logging from neo.lib import logging
from neo.storage.checker import CHECK_COUNT from neo.storage.checker import CHECK_COUNT
from neo.storage.replicator import Replicator
from neo.lib.connector import SocketConnector from neo.lib.connector import SocketConnector
from neo.lib.connection import ClientConnection from neo.lib.connection import ClientConnection
from neo.lib.event import EventManager from neo.lib.event import EventManager
...@@ -352,6 +353,36 @@ class ReplicationTests(NEOThreadedTest): ...@@ -352,6 +353,36 @@ class ReplicationTests(NEOThreadedTest):
cluster.stop() cluster.stop()
cluster.reset(True) cluster.reset(True)
def testResumingReplication(self):
cluster = NEOCluster(replicas=1)
try:
s0, s1 = cluster.storage_list
cluster.start(storage_list=(s0,))
t, c = cluster.getTransaction()
r = c.root()
r._p_changed = 1
t.commit()
s1.start()
self.tic()
with Patch(Replicator, connected=lambda *_: None):
cluster.enableStorageList((s1,))
cluster.neoctl.tweakPartitionTable()
r._p_changed = 1
t.commit()
self.tic()
s1.stop()
cluster.join((s1,))
t0, t1, t2 = c._storage.iterator()
s1.resetNode()
s1.start()
self.tic()
self.assertEqual([], cluster.getOutdatedCells())
s0.stop()
cluster.join((s0,))
t0, t1, t2 = c._storage.iterator()
finally:
cluster.stop()
def testCheckReplicas(self): def testCheckReplicas(self):
from neo.storage import checker from neo.storage import checker
def corrupt(offset): def corrupt(offset):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment