Commit 883bc312 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Remove replicator handler from replicator.py, it has moved to

handlers/replication.py in commit #778.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@783 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b1e46c47
......@@ -52,133 +52,6 @@ class Partition(object):
return False
return True
class ReplicationEventHandler(StorageEventHandler):
"""This class handles events for replications."""
def connectionCompleted(self, conn):
# Nothing to do.
pass
def connectionFailed(self, conn):
logging.error('replication is stopped due to connection failure')
self.app.replicator.reset()
def timeoutExpired(self, conn):
logging.error('replication is stopped due to timeout')
self.app.replicator.reset()
def connectionClosed(self, conn):
logging.error('replication is stopped due to close')
self.app.replicator.reset()
def peerBroken(self, conn):
logging.error('replication is stopped due to breakage')
self.app.replicator.reset()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
# Nothing to do.
pass
def handleAnswerTIDs(self, conn, packet, tid_list):
app = self.app
if app.replicator.current_connection is not conn:
return
if tid_list:
# If I have pending TIDs, check which TIDs I don't have, and
# request the data.
present_tid_list = app.dm.getTIDListPresent(tid_list)
tid_set = set(tid_list) - set(present_tid_list)
for tid in tid_set:
conn.ask(protocol.askTransactionInformation(tid), timeout=300)
# And, ask more TIDs.
app.replicator.tid_offset += 1000
offset = app.replicator.tid_offset
p = protocol.askTIDs(offset, offset + 1000,
app.replicator.current_partition.getRID())
conn.ask(p, timeout=300)
else:
# If no more TID, a replication of transactions is finished.
# So start to replicate objects now.
p = protocol.askOIDs(0, 1000,
app.replicator.current_partition.getRID())
conn.ask(p, timeout=300)
app.replicator.oid_offset = 0
def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list):
app = self.app
if app.replicator.current_connection is not conn:
return
# Directly store the transaction.
app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), False)
def handleAnswerOIDs(self, conn, packet, oid_list):
app = self.app
if app.replicator.current_connection is not conn:
return
if oid_list:
# Pick one up, and ask the history.
oid = oid_list.pop()
conn.ask(protocol.askObjectHistory(oid, 0, 1000), timeout=300)
app.replicator.serial_offset = 0
app.replicator.oid_list = oid_list
else:
# Nothing remains, so the replication for this partition is
# finished.
app.replicator.replication_done = True
def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
app = self.app
if app.replicator.current_connection is not conn:
return
if history_list:
# Check if I have objects, request those which I don't have.
serial_list = [t[0] for t in history_list]
present_serial_list = app.dm.getSerialListPresent(oid, serial_list)
serial_set = set(serial_list) - set(present_serial_list)
for serial in serial_set:
conn.ask(protocol.askObject(oid, serial, INVALID_TID), timeout=300)
# And, ask more serials.
app.replicator.serial_offset += 1000
offset = app.replicator.serial_offset
p = protocol.askObjectHistory(oid, offset, offset + 1000)
conn.ask(p, timeout=300)
else:
# This OID is finished. So advance to next.
oid_list = app.replicator.oid_list
if oid_list:
# If I have more pending OIDs, pick one up.
oid = oid_list.pop()
conn.ask(protocol.askObjectHistory(oid, 0, 1000), timeout=300)
app.replicator.serial_offset = 0
else:
# Otherwise, acquire more OIDs.
app.replicator.oid_offset += 1000
offset = app.replicator.oid_offset
p = protocol.askOIDs(offset, offset + 1000,
app.replicator.current_partition.getRID())
conn.ask(p, timeout=300)
def handleAnswerObject(self, conn, packet, oid, serial_start,
serial_end, compression, checksum, data):
app = self.app
if app.replicator.current_connection is not conn:
return
# Directly store the transaction.
obj = (oid, compression, checksum, data)
app.dm.storeTransaction(serial_start, [obj], None, False)
del obj
del data
class Replicator(object):
"""This class handles replications of objects and transactions.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment