Commit ee6b1f16 authored by Vincent Pelletier's avatar Vincent Pelletier

Make replication support whole-object deletion.

This can happen if object creation was transactionally undone, and database
was packed.
Update test.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2440 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 4dbd1326
...@@ -132,32 +132,31 @@ class ReplicationHandler(EventHandler): ...@@ -132,32 +132,31 @@ class ReplicationHandler(EventHandler):
def answerObjectHistoryFrom(self, conn, object_dict): def answerObjectHistoryFrom(self, conn, object_dict):
app = self.app app = self.app
ask = conn.ask ask = conn.ask
my_object_dict = app.replicator.getObjectHistoryFromResult()
deleteObject = app.dm.deleteObject deleteObject = app.dm.deleteObject
my_object_dict = app.replicator.getObjectHistoryFromResult()
object_set = set()
max_oid = max(object_dict.iterkeys()) max_oid = max(object_dict.iterkeys())
max_serial = max(object_dict[max_oid]) max_serial = max(object_dict[max_oid])
for oid, serial_list in object_dict.iteritems(): for oid, serial_list in object_dict.iteritems():
# Check if I have objects, request those which I don't have. for serial in serial_list:
if oid in my_object_dict: object_set.add((oid, serial))
# We must ignore extra serials we might have locally found for my_object_set = set()
# last received oid, as they can just be present in our list for oid, serial_list in my_object_dict.iteritems():
# because we lacked some records (hence, we would have fetched if oid > max_oid:
# rows further than other node for the same number of rows). continue
if oid == max_oid: elif oid == max_oid:
my_serial_list = (x for x in my_object_dict[oid] filter = lambda x: x <= max_serial
if x <= max_serial)
else:
my_serial_list = my_object_dict[oid]
my_serial_set = frozenset(my_serial_list)
serial_set = frozenset(serial_list)
extra_serial_set = my_serial_set - serial_set
for serial in extra_serial_set:
deleteObject(oid, serial)
missing_serial_set = serial_set - my_serial_set
else: else:
missing_serial_set = serial_list filter = lambda x: True
for serial in missing_serial_set: for serial in serial_list:
ask(Packets.AskObject(oid, serial, None), timeout=300) if filter(serial):
my_object_set.add((oid, serial))
extra_object_set = my_object_set - object_set
for oid, serial in extra_object_set:
deleteObject(oid, serial)
missing_object_set = object_set - my_object_set
for oid, serial in missing_object_set:
ask(Packets.AskObject(oid, serial, None), timeout=300)
ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1), ask(self._doAskCheckSerialRange(max_oid, add64(max_serial, 1),
RANGE_LENGTH)) RANGE_LENGTH))
......
...@@ -223,11 +223,12 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -223,11 +223,12 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
oid_2 = self.getOID(2) oid_2 = self.getOID(2)
oid_3 = self.getOID(3) oid_3 = self.getOID(3)
oid_4 = self.getOID(4) oid_4 = self.getOID(4)
oid_5 = self.getOID(5)
tid_list = [self.getOID(x) for x in xrange(7)] tid_list = [self.getOID(x) for x in xrange(7)]
oid_dict = FakeDict(( oid_dict = FakeDict((
(oid_1, [tid_list[0], tid_list[1]]), (oid_1, [tid_list[0], tid_list[1]]),
(oid_2, [tid_list[3]]), (oid_2, [tid_list[2], tid_list[3]]),
(oid_3, [tid_list[5]]), (oid_4, [tid_list[5]]),
)) ))
flat_oid_list = [] flat_oid_list = []
for oid, serial_list in oid_dict.iteritems(): for oid, serial_list in oid_dict.iteritems():
...@@ -236,7 +237,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -236,7 +237,7 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
app = self.getApp(conn=conn, history_result={}) app = self.getApp(conn=conn, history_result={})
# With no known OID/Serial # With no known OID/Serial
ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict) ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict)
self._checkPacketSerialList(conn, flat_oid_list, oid_3, tid_list[5], self._checkPacketSerialList(conn, flat_oid_list, oid_4, tid_list[5],
app) app)
# With some known OID/Serials # With some known OID/Serials
# For test to be realist, history_result should contain the same # For test to be realist, history_result should contain the same
...@@ -245,17 +246,23 @@ class StorageReplicationHandlerTests(NeoUnitTestBase): ...@@ -245,17 +246,23 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
conn = self.getFakeConnection() conn = self.getFakeConnection()
app = self.getApp(conn=conn, history_result={ app = self.getApp(conn=conn, history_result={
oid_1: [oid_dict[oid_1][0], ], oid_1: [oid_dict[oid_1][0], ],
oid_3: [tid_list[4], oid_dict[oid_3][0], tid_list[6]], oid_3: [tid_list[2]],
oid_4: [tid_list[2], ], oid_4: [tid_list[4], oid_dict[oid_4][0], tid_list[6]],
oid_5: [tid_list[6]],
}) })
ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict) ReplicationHandler(app).answerObjectHistoryFrom(conn, oid_dict)
self._checkPacketSerialList(conn, ( self._checkPacketSerialList(conn, (
(oid_1, oid_dict[oid_1][1]), (oid_1, oid_dict[oid_1][1]),
(oid_2, oid_dict[oid_2][0]), (oid_2, oid_dict[oid_2][0]),
), oid_3, tid_list[5], app) (oid_2, oid_dict[oid_2][1]),
), oid_4, tid_list[5], app)
calls = app.dm.mockGetNamedCalls('deleteObject') calls = app.dm.mockGetNamedCalls('deleteObject')
self.assertEqual(len(calls), 1) actual_deletes = set(((x.getParam(0), x.getParam(1)) for x in calls))
calls[0].checkArgs(oid_3, tid_list[4]) expected_deletes = set((
(oid_3, tid_list[2]),
(oid_4, tid_list[4]),
))
self.assertEqual(actual_deletes, expected_deletes)
def test_answerObject(self): def test_answerObject(self):
conn = self.getFakeConnection() conn = self.getFakeConnection()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment