Commit 04d6a839 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Master recovery handler use PartitionTable.load() that clear the content if the

ptid is different. 
Storage node no more use a 'ptid' attribute on the app instance but use the
getID() accessor from the PartitionTable class.
PartitionTable.clear() now also reset the ptid.
PartitionTable.load() do more checks (taken from master.recovery)


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1081 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 454bcdb2
...@@ -35,15 +35,12 @@ class RecoveryHandler(MasterHandler): ...@@ -35,15 +35,12 @@ class RecoveryHandler(MasterHandler):
pt = app.pt pt = app.pt
# Get max values. # Get max values.
if app.loid < loid: app.loid = max(loid, app.loid)
app.loid = loid app.tid = max(ltid, app.ltid)
if app.ltid < ltid:
app.ltid = ltid
if lptid > pt.getID(): if lptid > pt.getID():
# something newer # something newer
app.pt.setID(lptid)
app.target_uuid = conn.getUUID() app.target_uuid = conn.getUUID()
app.pt.clear() app.pt.setID(lptid)
conn.ask(protocol.askPartitionTable([])) conn.ask(protocol.askPartitionTable([]))
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
...@@ -54,17 +51,5 @@ class RecoveryHandler(MasterHandler): ...@@ -54,17 +51,5 @@ class RecoveryHandler(MasterHandler):
logging.warn('got answer partition table from %s while waiting for %s', logging.warn('got answer partition table from %s while waiting for %s',
dump(uuid), dump(app.target_uuid)) dump(uuid), dump(app.target_uuid))
return return
self.app.pt.load(ptid, row_list, self.app.nm)
for offset, cell_list in row_list:
if offset >= app.pt.getPartitions() or app.pt.hasOffset(offset):
# There must be something wrong.
raise UnexpectedPacketError
for uuid, state in cell_list:
n = app.nm.getNodeByUUID(uuid)
if n is None:
n = StorageNode(uuid = uuid)
n.setState(TEMPORARILY_DOWN_STATE)
app.nm.add(n)
app.pt.setCell(offset, n, state)
...@@ -76,6 +76,7 @@ class PartitionTable(object): ...@@ -76,6 +76,7 @@ class PartitionTable(object):
def clear(self): def clear(self):
"""Forget an existing partition table.""" """Forget an existing partition table."""
self.id = None
self.num_filled_rows = 0 self.num_filled_rows = 0
# Note: don't use [[]] * self.np construct, as it duplicates # Note: don't use [[]] * self.np construct, as it duplicates
# instance *references*, so the outer list contains really just one # instance *references*, so the outer list contains really just one
...@@ -171,6 +172,7 @@ class PartitionTable(object): ...@@ -171,6 +172,7 @@ class PartitionTable(object):
self.id = ptid self.id = ptid
self.clear() self.clear()
for offset, row in row_list: for offset, row in row_list:
assert offset < self.getPartitions() and not self.hasOffset(offset)
for uuid, state in row: for uuid, state in row:
node = nm.getNodeByUUID(uuid) node = nm.getNodeByUUID(uuid)
# XXX: the node should be known before we receive the partition # XXX: the node should be known before we receive the partition
......
...@@ -63,8 +63,6 @@ class Application(object): ...@@ -63,8 +63,6 @@ class Application(object):
# The partition table is initialized after getting the number of # The partition table is initialized after getting the number of
# partitions. # partitions.
self.pt = None self.pt = None
# XXX: shoud use self.pt.getID() instead
self.ptid = None
self.replicator = None self.replicator = None
self.listening_conn = None self.listening_conn = None
...@@ -106,9 +104,10 @@ class Application(object): ...@@ -106,9 +104,10 @@ class Application(object):
dm.setName(self.name) dm.setName(self.name)
elif name != self.name: elif name != self.name:
raise RuntimeError('name does not match with the database') raise RuntimeError('name does not match with the database')
self.ptid = dm.getPTID() ptid = dm.getPTID()
logging.info("loaded configuration from db : uuid = %s, ptid = %s, name = %s, np = %s, nr = %s" \ logging.info("Configuration: uuid=%s, ptid=%s, name=%s, np=%s, nr=%s" \
%(dump(self.uuid), dump(self.ptid), name, num_partitions, num_replicas)) % (dump(self.uuid), dump(ptid), name, num_partitions,
num_replicas))
def loadPartitionTable(self): def loadPartitionTable(self):
"""Load a partition table from the database.""" """Load a partition table from the database."""
...@@ -190,7 +189,6 @@ class Application(object): ...@@ -190,7 +189,6 @@ class Application(object):
# table, because the table might be incomplete. # table, because the table might be incomplete.
if pt is not None: if pt is not None:
self.loadPartitionTable() self.loadPartitionTable()
self.ptid = self.dm.getPTID()
if num_partitions != pt.getPartitions(): if num_partitions != pt.getPartitions():
raise RuntimeError('the number of partitions is inconsistent') raise RuntimeError('the number of partitions is inconsistent')
...@@ -200,7 +198,6 @@ class Application(object): ...@@ -200,7 +198,6 @@ class Application(object):
self.dm.setNumReplicas(num_replicas) self.dm.setNumReplicas(num_replicas)
self.pt = PartitionTable(num_partitions, num_replicas) self.pt = PartitionTable(num_partitions, num_replicas)
self.loadPartitionTable() self.loadPartitionTable()
self.ptid = self.dm.getPTID()
def verifyData(self): def verifyData(self):
"""Verify data under the control by a primary master node. """Verify data under the control by a primary master node.
......
...@@ -68,13 +68,16 @@ class HiddenHandler(BaseMasterHandler): ...@@ -68,13 +68,16 @@ class HiddenHandler(BaseMasterHandler):
the information is only about changes from the previous.""" the information is only about changes from the previous."""
# XXX: this is a copy/paste from handlers/master.py # XXX: this is a copy/paste from handlers/master.py
app = self.app app = self.app
if app.ptid >= ptid: if ptid <= app.pt.getID():
# Ignore this packet. # Ignore this packet.
logging.debug('ignoring older partition changes') logging.debug('ignoring older partition changes')
return return
# First, change the table on memory. # update partition table in memory and the database
app.ptid = ptid app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
# Check changes for replications
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
if uuid == app.uuid and app.replicator is not None: if uuid == app.uuid and app.replicator is not None:
# If this is for myself, this can affect replications. # If this is for myself, this can affect replications.
...@@ -83,10 +86,6 @@ class HiddenHandler(BaseMasterHandler): ...@@ -83,10 +86,6 @@ class HiddenHandler(BaseMasterHandler):
elif state == OUT_OF_DATE_STATE: elif state == OUT_OF_DATE_STATE:
app.replicator.addPartition(offset) app.replicator.addPartition(offset)
# update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
def handleStartOperation(self, conn, packet): def handleStartOperation(self, conn, packet):
self.app.operational = True self.app.operational = True
......
...@@ -39,13 +39,16 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -39,13 +39,16 @@ class MasterOperationHandler(BaseMasterHandler):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
app = self.app app = self.app
if app.ptid >= ptid: if ptid <= app.pt.getID():
# Ignore this packet. # Ignore this packet.
logging.debug('ignoring older partition changes') logging.debug('ignoring older partition changes')
return return
# First, change the table on memory. # update partition table in memory and the database
app.ptid = ptid app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
# Check changes for replications
for offset, uuid, state in cell_list: for offset, uuid, state in cell_list:
if uuid == app.uuid and app.replicator is not None: if uuid == app.uuid and app.replicator is not None:
# If this is for myself, this can affect replications. # If this is for myself, this can affect replications.
...@@ -54,10 +57,6 @@ class MasterOperationHandler(BaseMasterHandler): ...@@ -54,10 +57,6 @@ class MasterOperationHandler(BaseMasterHandler):
elif state == OUT_OF_DATE_STATE: elif state == OUT_OF_DATE_STATE:
app.replicator.addPartition(offset) app.replicator.addPartition(offset)
# update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list)
def handleLockInformation(self, conn, packet, tid): def handleLockInformation(self, conn, packet, tid):
app = self.app app = self.app
try: try:
......
...@@ -27,7 +27,7 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler): ...@@ -27,7 +27,7 @@ class StorageOperationHandler(BaseClientAndStorageOperationHandler):
app = self.app app = self.app
oid = app.dm.getLastOID() oid = app.dm.getLastOID()
tid = app.dm.getLastTID() tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.ptid) p = protocol.answerLastIDs(oid, tid, app.pt.getID())
conn.answer(p, packet.getId()) conn.answer(p, packet.getId())
def handleAskOIDs(self, conn, packet, first, last, partition): def handleAskOIDs(self, conn, packet, first, last, partition):
......
...@@ -29,7 +29,7 @@ class VerificationHandler(BaseMasterHandler): ...@@ -29,7 +29,7 @@ class VerificationHandler(BaseMasterHandler):
app = self.app app = self.app
oid = app.dm.getLastOID() oid = app.dm.getLastOID()
tid = app.dm.getLastTID() tid = app.dm.getLastTID()
p = protocol.answerLastIDs(oid, tid, app.ptid) p = protocol.answerLastIDs(oid, tid, app.pt.getID())
conn.answer(p, packet.getId()) conn.answer(p, packet.getId())
def handleAskPartitionTable(self, conn, packet, offset_list): def handleAskPartitionTable(self, conn, packet, offset_list):
...@@ -50,18 +50,17 @@ class VerificationHandler(BaseMasterHandler): ...@@ -50,18 +50,17 @@ class VerificationHandler(BaseMasterHandler):
except IndexError: except IndexError:
raise protocol.ProtocolError('invalid partition table offset') raise protocol.ProtocolError('invalid partition table offset')
p = protocol.answerPartitionTable(app.ptid, row_list) p = protocol.answerPartitionTable(app.pt.getID(), row_list)
conn.answer(p, packet.getId()) conn.answer(p, packet.getId())
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
app = self.app app = self.app
if app.ptid >= ptid: if ptid <= app.pt.getID():
# Ignore this packet. # Ignore this packet.
logging.debug('ignoring older partition changes') logging.debug('ignoring older partition changes')
return return
app.ptid = ptid
# update partition table in memory and the database # update partition table in memory and the database
app.pt.update(ptid, cell_list, app.nm) app.pt.update(ptid, cell_list, app.nm)
app.dm.changePartitionTable(ptid, cell_list) app.dm.changePartitionTable(ptid, cell_list)
......
...@@ -196,7 +196,7 @@ class Replicator(object): ...@@ -196,7 +196,7 @@ class Replicator(object):
self.partition_dict.pop(self.current_partition.getRID()) self.partition_dict.pop(self.current_partition.getRID())
# Notify to a primary master node that my cell is now up-to-date. # Notify to a primary master node that my cell is now up-to-date.
conn = self.primary_master_connection conn = self.primary_master_connection
p = protocol.notifyPartitionChanges( app.ptid, p = protocol.notifyPartitionChanges(app.pt.getID(),
[(self.current_partition.getRID(), app.uuid, UP_TO_DATE_STATE)]) [(self.current_partition.getRID(), app.uuid, UP_TO_DATE_STATE)])
conn.notify(p) conn.notify(p)
except KeyError: except KeyError:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment