Commit 67f4f5aa authored by Yoshinori Okuji's avatar Yoshinori Okuji

Implement handlers for Ask OIDs and Answer OIDs. Use REPLACE instead of INSERT...

Implement handlers for Ask OIDs and Answer OIDs. Use REPLACE instead of INSERT for storing a transaction for safety in replications.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@204 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent aa4995fd
...@@ -17,6 +17,7 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT ...@@ -17,6 +17,7 @@ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICAT
ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \ ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \ ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \ ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
ASK_OIDS, ANSWER_OIDS, \
NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \ NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \ PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
INTERNAL_ERROR_CODE INTERNAL_ERROR_CODE
...@@ -256,7 +257,7 @@ class EventHandler(object): ...@@ -256,7 +257,7 @@ class EventHandler(object):
def handleAskOIDs(self, conn, packet, first, last, partition): def handleAskOIDs(self, conn, packet, first, last, partition):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAnswerOIDs(self, conn, packet, tid_list): def handleAnswerOIDs(self, conn, packet, oid_list):
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
# Error packet handlers. # Error packet handlers.
...@@ -329,6 +330,8 @@ class EventHandler(object): ...@@ -329,6 +330,8 @@ class EventHandler(object):
d[ANSWER_TRANSACTION_INFORMATION] = self.handleAnswerTransactionInformation d[ANSWER_TRANSACTION_INFORMATION] = self.handleAnswerTransactionInformation
d[ASK_OBJECT_HISTORY] = self.handleAskObjectHistory d[ASK_OBJECT_HISTORY] = self.handleAskObjectHistory
d[ANSWER_OBJECT_HISTORY] = self.handleAnswerObjectHistory d[ANSWER_OBJECT_HISTORY] = self.handleAnswerObjectHistory
d[ASK_OIDS] = self.handleAskOIDs
d[ANSWER_OIDS] = self.handleAnswerOIDs
self.packet_dispatch_table = d self.packet_dispatch_table = d
......
...@@ -129,6 +129,12 @@ class DatabaseManager(object): ...@@ -129,6 +129,12 @@ class DatabaseManager(object):
area as well.""" area as well."""
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
def getOIDList(self, offset, length, num_partitions, partition_list):
"""Return a list of OIDs in descending order from an offset,
at most the specified length. The list of partitions are passed
to filter out non-applicable TIDs."""
raise NotImplementedError('this method must be overridden')
def getObjectHistory(self, oid, offset = 0, length = 1): def getObjectHistory(self, oid, offset = 0, length = 1):
"""Return a list of serials and sizes for a given object ID. """Return a list of serials and sizes for a given object ID.
The length specifies the maximum size of such a list. The first serial The length specifies the maximum size of such a list. The first serial
......
...@@ -217,3 +217,7 @@ class StorageEventHandler(EventHandler): ...@@ -217,3 +217,7 @@ class StorageEventHandler(EventHandler):
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list): def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
logging.info('ignoring answer unfinished transactions') logging.info('ignoring answer unfinished transactions')
pass pass
def handleAskOIDs(self, conn, packet, first, last, partition):
logging.info('ignoring ask oids')
pass
...@@ -398,7 +398,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -398,7 +398,7 @@ class MySQLDatabaseManager(DatabaseManager):
for oid, compression, checksum, data in object_list: for oid, compression, checksum, data in object_list:
oid = u64(oid) oid = u64(oid)
data = e(data) data = e(data)
q("""INSERT INTO %s VALUES (%d, %d, %d, %d, '%s')""" \ q("""REPLACE INTO %s VALUES (%d, %d, %d, %d, '%s')""" \
% (obj_table, oid, tid, compression, checksum, data)) % (obj_table, oid, tid, compression, checksum, data))
if transaction is not None: if transaction is not None:
oid_list, user, desc, ext = transaction oid_list, user, desc, ext = transaction
...@@ -406,7 +406,7 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -406,7 +406,7 @@ class MySQLDatabaseManager(DatabaseManager):
user = e(user) user = e(user)
desc = e(desc) desc = e(desc)
ext = e(ext) ext = e(ext)
q("""INSERT INTO %s VALUES (%d, '%s', '%s', '%s', '%s')""" \ q("""REPLACE INTO %s VALUES (%d, '%s', '%s', '%s', '%s')""" \
% (trans_table, tid, oids, user, desc, ext)) % (trans_table, tid, oids, user, desc, ext))
except: except:
self.rollback() self.rollback()
...@@ -467,6 +467,15 @@ class MySQLDatabaseManager(DatabaseManager): ...@@ -467,6 +467,15 @@ class MySQLDatabaseManager(DatabaseManager):
return oid_list, user, desc, ext return oid_list, user, desc, ext
return None return None
def getOIDList(self, offset, length, num_partitions, partition_list):
q = self.query
r = q("""SELECT DISTINCT oid FROM obj WHERE MOD(oid,%d) in (%s)
ORDER BY oid DESC LIMIT %d,%d""" \
% (num_partitions,
','.join([str(p) for p in partition_list]),
offset, length))
return [p64(t[0]) for t in r]
def getObjectHistory(self, oid, offset = 0, length = 1): def getObjectHistory(self, oid, offset = 0, length = 1):
q = self.query q = self.query
oid = u64(oid) oid = u64(oid)
......
...@@ -436,3 +436,29 @@ class OperationEventHandler(StorageEventHandler): ...@@ -436,3 +436,29 @@ class OperationEventHandler(StorageEventHandler):
self.app.replicator.setUnfinishedTIDList(tid_list) self.app.replicator.setUnfinishedTIDList(tid_list)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleAskOIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return OIDs only
# about usable partitions assigned to me.
if first >= last:
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid offsets'))
return
app = self.app
if partition == INVALID_PARTITION:
# Collect all usable partitions for me.
getCellList = app.pt.getCellList
partition_list = []
for offset in xrange(app.num_partitions):
for cell in getCellList(offset, True):
if cell.getUUID() == app.uuid:
partition_list.append(offset)
break
else:
partition_list = [partition]
oid_list = app.dm.getOIDList(first, last - first,
app.num_partitions, partition_list)
conn.addPacket(Packet().answerOIDs(packet.getId(), oid_list))
...@@ -41,6 +41,18 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -41,6 +41,18 @@ class ReplicationEventHandler(StorageEventHandler):
logging.error('replication is stopped due to connection failure') logging.error('replication is stopped due to connection failure')
self.app.replicator.reset() self.app.replicator.reset()
def timeoutExpired(self, conn):
logging.error('replication is stopped due to timeout')
self.app.replicator.reset()
def connectionClosed(self, conn):
logging.error('replication is stopped due to close')
self.app.replicator.reset()
def peerBroken(self, conn):
logging.error('replication is stopped due to breakage')
self.app.replicator.reset()
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas): num_partitions, num_replicas):
...@@ -50,6 +62,8 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -50,6 +62,8 @@ class ReplicationEventHandler(StorageEventHandler):
def handleAnswerTIDs(self, conn, packet, tid_list): def handleAnswerTIDs(self, conn, packet, tid_list):
app = self.app app = self.app
if tid_list: if tid_list:
# If I have pending TIDs, check which TIDs I don't have, and
# request the data.
present_tid_list = app.dm.getTIDListPresent(tid_list) present_tid_list = app.dm.getTIDListPresent(tid_list)
tid_set = set(tid_list) tid_set = set(tid_list)
present_tid_set = set(present_tid_list) present_tid_set = set(present_tid_list)
...@@ -61,6 +75,7 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -61,6 +75,7 @@ class ReplicationEventHandler(StorageEventHandler):
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(timeout = 300)
# And, ask more TIDs.
app.replicator.tid_offset += 1000 app.replicator.tid_offset += 1000
offset = app.replicator.tid_offset offset = app.replicator.tid_offset
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -70,6 +85,8 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -70,6 +85,8 @@ class ReplicationEventHandler(StorageEventHandler):
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(timeout = 300) conn.expectMessage(timeout = 300)
else: else:
# If no more TID, a replication of transactions is finished.
# So start to replicate objects now.
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askOIDs(msg_id, 0, 1000, p.askOIDs(msg_id, 0, 1000,
...@@ -84,7 +101,51 @@ class ReplicationEventHandler(StorageEventHandler): ...@@ -84,7 +101,51 @@ class ReplicationEventHandler(StorageEventHandler):
# Directly store the transaction. # Directly store the transaction.
app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), True) app.dm.storeTransaction(tid, (), (oid_list, user, desc, ext), True)
def handleAnswerOIDs(self, conn, packet, oid_list):
app = self.app
if oid_list:
# Pick one up, and ask the history.
oid = oid_list.pop()
msg_id = conn.getNextId()
p = Packet()
p.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p)
conn.expectMessage(timeout = 300)
app.replicator.serial_offset = 0
app.replicator.oid_list = oid_list
else:
# Nothing remains, so the replication for this partition is
# finished.
app.replicator.replication_done = True
def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
app = self.app
if history_list:
# Check if I have objects, request those which I don't have.
raise NotImplementedError
else:
# This OID is finished. So advance to next.
oid_list = app.replicator.oid_list
if oid_list:
# If I have more pending OIDs, pick one up.
oid = oid_list.pop()
msg_id = conn.getNextId()
p = Packet()
p.askObjectHistory(msg_id, oid, 0, 1000)
conn.addPacket(p)
conn.expectMessage(timeout = 300)
app.replicator.serial_offset = 0
else:
# Otherwise, acquire more OIDs.
app.replicator.oid_offset += 1000
offset = app.replicator.oid_offset
msg_id = conn.getNextId()
p = Packet()
p.askOIDs(msg_id, offset, offset + 1000,
app.replicator.current_partition.getRID())
conn.addPacket(p)
conn.expectMessage(timeout = 300)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment