Commit 795082b4 authored by Vincent Pelletier's avatar Vincent Pelletier

Implement true binary search in replication.

If current chunk size is lesser than maximum, it means latest check for the
whole chunk found a problem. If current smaller check succeeded, move to
the other half and divide size by two.
Also, factorise replication "check...Range" handlers.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2405 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent a31e45bb
......@@ -26,6 +26,10 @@ from neo import util
RANGE_LENGTH = 4000
MIN_RANGE_LENGTH = 1000
CHECK_CHUNK = 0
CHECK_REPLICATE = 1
CHECK_DONE = 2
"""
Replication algorythm
......@@ -179,66 +183,99 @@ class ReplicationHandler(EventHandler):
return Packets.AskObjectHistoryFrom(min_oid, min_serial, max_serial,
length, partition_id)
@checkConnectionIsReplicatorConnection
def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
max_tid):
replicator = self.app.replicator
our = replicator.getTIDCheckResult(min_tid, length)
his = (count, tid_checksum, max_tid)
p = None
if our != his:
# Something is different...
def _checkRange(self, match, current_boundary, next_boundary, length,
count):
if match:
# Same data on both sides
if length < RANGE_LENGTH and length == count:
# ...and previous check detected a difference - and we still
# haven't reached the end. This means that we just check the
# first half of a chunk which, as a whole, is different. So
# next test must happen on the next chunk.
recheck_min_boundary = next_boundary
else:
# ...and we just checked a whole chunk, move on to the next
# one.
recheck_min_boundary = None
else:
# Something is different in current chunk
recheck_min_boundary = current_boundary
if recheck_min_boundary is None:
if count == length:
# Go on with next chunk
action = CHECK_CHUNK
params = (next_boundary, RANGE_LENGTH)
else:
# No more chunks.
action = CHECK_DONE
params = None
else:
# We must recheck current chunk.
if length <= MIN_RANGE_LENGTH:
# We are already at minimum chunk length, replicate.
conn.ask(self._doAskTIDsFrom(min_tid, count))
action = CHECK_REPLICATE
params = (recheck_min_boundary, )
else:
# Check a smaller chunk.
# Note: this could be made into a real binary search, but is
# it really worth the work ?
# Note: +1, so we can detect we reached the end when answer
# comes back.
p = self._doAskCheckTIDRange(min_tid, min(length / 2,
count + 1))
if p is None:
if count == length and \
max_tid < replicator.getCurrentCriticalTID():
# Go on with next chunk
p = self._doAskCheckTIDRange(add64(max_tid, 1))
action = CHECK_CHUNK
params = (recheck_min_boundary, min(length / 2, count + 1))
return action, params
@checkConnectionIsReplicatorConnection
def answerCheckTIDRange(self, conn, min_tid, length, count, tid_checksum,
max_tid):
ask = conn.ask
replicator = self.app.replicator
next_tid = add64(max_tid, 1)
action, params = self._checkRange(
replicator.getTIDCheckResult(min_tid, length) == (
count, tid_checksum, max_tid), min_tid, next_tid, length,
count)
if action == CHECK_REPLICATE:
(min_tid, ) = params
ask(self._doAskTIDsFrom(min_tid, count))
if length == count:
action = CHECK_CHUNK
params = (next_tid, RANGE_LENGTH)
else:
# If no more TID, a replication of transactions is finished.
# So start to replicate objects now.
p = self._doAskCheckSerialRange(ZERO_OID, ZERO_TID)
conn.ask(p)
action = CHECK_DONE
if action == CHECK_CHUNK:
(min_tid, count) = params
if min_tid >= replicator.getCurrentCriticalTID():
# Stop if past critical TID
action = CHECK_DONE
else:
ask(self._doAskCheckTIDRange(min_tid, count))
if action == CHECK_DONE:
# If no more TID, a replication of transactions is finished.
# So start to replicate objects now.
ask(self._doAskCheckSerialRange(ZERO_OID, ZERO_TID))
@checkConnectionIsReplicatorConnection
def answerCheckSerialRange(self, conn, min_oid, min_serial, length, count,
oid_checksum, max_oid, serial_checksum, max_serial):
ask = conn.ask
replicator = self.app.replicator
our = replicator.getSerialCheckResult(min_oid, min_serial, length)
his = (count, oid_checksum, max_oid, serial_checksum, max_serial)
p = None
if our != his:
# Something is different...
if length <= MIN_RANGE_LENGTH:
# We are already at minimum chunk length, replicate.
conn.ask(self._doAskObjectHistoryFrom(min_oid, min_serial,
count))
else:
# Check a smaller chunk.
# Note: this could be made into a real binary search, but is
# it really worth the work ?
# Note: +1, so we can detect we reached the end when answer
# comes back.
p = self._doAskCheckSerialRange(min_oid, min_serial,
min(length / 2, count + 1))
if p is None:
if count == length:
# Go on with next chunk
p = self._doAskCheckSerialRange(max_oid, add64(max_serial, 1))
next_params = (max_oid, add64(max_serial, 1))
action, params = self._checkRange(
replicator.getSerialCheckResult(min_oid, min_serial, length) == (
count, oid_checksum, max_oid, serial_checksum, max_serial),
(min_oid, min_serial), next_params, length, count)
if action == CHECK_REPLICATE:
((min_oid, min_serial), ) = params
ask(self._doAskObjectHistoryFrom(min_oid, min_serial, count))
if length == count:
action = CHECK_CHUNK
params = (next_params, RANGE_LENGTH)
else:
# Nothing remains, so the replication for this partition is
# finished.
replicator.setReplicationDone()
if p is not None:
conn.ask(p)
action = CHECK_DONE
if action == CHECK_CHUNK:
((min_oid, min_serial), count) = params
ask(self._doAskCheckSerialRange(min_oid, min_serial, count))
if action == CHECK_DONE:
# Nothing remains, so the replication for this partition is
# finished.
replicator.setReplicationDone()
......@@ -237,12 +237,12 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
data_serial)], None, False)
# CheckTIDRange
def test_answerCheckTIDRangeIdenticalChunkWithNext(self):
def test_answerCheckTIDFullRangeIdenticalChunkWithNext(self):
min_tid = self.getNextTID()
max_tid = self.getNextTID()
critical_tid = self.getNextTID()
assert max_tid < critical_tid
length = RANGE_LENGTH / 2
length = RANGE_LENGTH
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
......@@ -261,6 +261,30 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, plength, ppartition)
def test_answerCheckTIDSmallRangeIdenticalChunkWithNext(self):
min_tid = self.getNextTID()
max_tid = self.getNextTID()
critical_tid = self.getNextTID()
assert max_tid < critical_tid
length = RANGE_LENGTH / 2
rid = 12
conn = self.getFakeConnection()
app = self.getApp(tid_check_result=(length, 0, max_tid), rid=rid,
conn=conn, critical_tid=critical_tid)
handler = ReplicationHandler(app)
# Peer has the same data as we have: length, checksum and max_tid
# match.
handler.answerCheckTIDRange(conn, min_tid, length, length, 0, max_tid)
# Result: go on with next chunk
pmin_tid, plength, ppartition = self.checkAskPacket(conn,
Packets.AskCheckTIDRange, decode=True)
self.assertEqual(pmin_tid, add64(max_tid, 1))
self.assertEqual(plength, length / 2)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('checkTIDRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_tid, plength, ppartition)
def test_answerCheckTIDRangeIdenticalChunkAboveCriticalTID(self):
critical_tid = self.getNextTID()
min_tid = self.getNextTID()
......@@ -409,12 +433,12 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
# CheckSerialRange
def test_answerCheckSerialRangeIdenticalChunkWithNext(self):
def test_answerCheckSerialFullRangeIdenticalChunkWithNext(self):
min_oid = self.getOID(1)
max_oid = self.getOID(10)
min_serial = self.getNextTID()
max_serial = self.getNextTID()
length = RANGE_LENGTH / 2
length = RANGE_LENGTH
rid = 12
conn = self.getFakeConnection()
app = self.getApp(serial_check_result=(length, 0, max_oid, 1,
......@@ -434,6 +458,31 @@ class StorageReplicationHandlerTests(NeoUnitTestBase):
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
def test_answerCheckSerialSmallRangeIdenticalChunkWithNext(self):
min_oid = self.getOID(1)
max_oid = self.getOID(10)
min_serial = self.getNextTID()
max_serial = self.getNextTID()
length = RANGE_LENGTH / 2
rid = 12
conn = self.getFakeConnection()
app = self.getApp(serial_check_result=(length, 0, max_oid, 1,
max_serial), rid=rid, conn=conn)
handler = ReplicationHandler(app)
# Peer has the same data as we have
handler.answerCheckSerialRange(conn, min_oid, min_serial, length,
length, 0, max_oid, 1, max_serial)
# Result: go on with next chunk
pmin_oid, pmin_serial, plength, ppartition = self.checkAskPacket(conn,
Packets.AskCheckSerialRange, decode=True)
self.assertEqual(pmin_oid, max_oid)
self.assertEqual(pmin_serial, add64(max_serial, 1))
self.assertEqual(plength, length / 2)
self.assertEqual(ppartition, rid)
calls = app.replicator.mockGetNamedCalls('checkSerialRange')
self.assertEqual(len(calls), 1)
calls[0].checkArgs(pmin_oid, pmin_serial, plength, ppartition)
def test_answerCheckSerialRangeIdenticalChunkWithoutNext(self):
min_oid = self.getOID(1)
max_oid = self.getOID(10)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment