Commit 783a64a6 authored by Yoshinori Okuji's avatar Yoshinori Okuji

Start to implement a verification step. Solve compicated situations in...

Start to implement a verification step. Solve compicated situations in handling request node identification packets.

git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@28 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 5155ff33
class NeoException(Exception): pass class NeoException(Exception): pass
class ElectionFailure(NeoException): pass class ElectionFailure(NeoException): pass
class PrimaryFailure(NeoException): pass class PrimaryFailure(NeoException): pass
class VerificationFailure(NeoException): pass
...@@ -12,7 +12,7 @@ from neo.node import NodeManager, MasterNode, StorageNode, ClientNode ...@@ -12,7 +12,7 @@ from neo.node import NodeManager, MasterNode, StorageNode, ClientNode
from neo.event import EventManager from neo.event import EventManager
from neo.util import dump from neo.util import dump
from neo.connection import ListeningConnection, ClientConnection, ServerConnection from neo.connection import ListeningConnection, ClientConnection, ServerConnection
from neo.exception import ElectionFailure, PrimaryFailure from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure
from neo.master.election import ElectionEventHandler from neo.master.election import ElectionEventHandler
from neo.master.recovery import RecoveryEventHandler from neo.master.recovery import RecoveryEventHandler
from neo.pt import PartitionTable from neo.pt import PartitionTable
...@@ -262,24 +262,41 @@ class Application(object): ...@@ -262,24 +262,41 @@ class Application(object):
if node_type == CLIENT_NODE_TYPE: if node_type == CLIENT_NODE_TYPE:
# Only to master nodes and storage nodes. # Only to master nodes and storage nodes.
for c in em.getConnectionList(): for c in self.em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
n = nm.getNodeByUUID(c.getUUID()) n = self.nm.getNodeByUUID(c.getUUID())
if isinstance(n, (MasterNode, StorageNode)): if isinstance(n, (MasterNode, StorageNode)):
p = Packet() p = Packet()
node_list = (node_type, ip_address, port, uuid, state) node_list = [(node_type, ip_address, port, uuid, state)]
p.notifyNodeStateChange(c.getNextId(), node_list) p.notifyNodeInformation(c.getNextId(), node_list)
c.addPacket(p) c.addPacket(p)
elif isinstance(node, (MasterNode, StorageNode)): elif isinstance(node, (MasterNode, StorageNode)):
for c in em.getConnectionList(): for c in self.em.getConnectionList():
if c.getUUID() is not None: if c.getUUID() is not None:
p = Packet() p = Packet()
node_list = (node_type, ip_address, port, uuid, state) node_list = [(node_type, ip_address, port, uuid, state)]
p.notifyNodeStateChange(c.getNextId(), node_list) p.notifyNodeInformation(c.getNextId(), node_list)
c.addPacket(p) c.addPacket(p)
else: else:
raise Runtime, 'unknown node type' raise Runtime, 'unknown node type'
def broadcastPartitionChanges(self, ptid, cell_list):
"""Broadcast a Notify Partition Changes packet."""
for c in em.getConnectionList():
if c.getUUID() is not None:
n = self.nm.getNodeByUUID(c.getUUID())
if isinstance(n, (ClientNode, StorageNode)):
# Split the packet if too big.
size = len(cell_list)
start = 0
while size:
amt = min(10000, size)
p = Packet()
p.notifyPartitionChanges(ptid, cell_list[start:start+amt])
c.addPacket(p)
size -= amt
start += amt
def recoverStatus(self): def recoverStatus(self):
logging.info('begin the recovery of the status') logging.info('begin the recovery of the status')
...@@ -350,14 +367,14 @@ class Application(object): ...@@ -350,14 +367,14 @@ class Application(object):
start = 0 start = 0
size = self.num_partitions size = self.num_partitions
while size: while size:
len = min(1000, size) amt = min(1000, size)
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askPartitionTable(msg_id, range(start, start + len)) p.askPartitionTable(msg_id, range(start, start + amt))
conn.addPacket(p) conn.addPacket(p)
conn.expectMessage(msg_id) conn.expectMessage(msg_id)
size -= len size -= amt
start += len start += amt
t = time() t = time()
while 1: while 1:
...@@ -389,9 +406,37 @@ class Application(object): ...@@ -389,9 +406,37 @@ class Application(object):
def playPrimaryRole(self): def playPrimaryRole(self):
logging.info('play the primary role') logging.info('play the primary role')
recovering = True
while recovering:
self.recoverStatus() self.recoverStatus()
recovering = False
try:
self.verifyData()
except VerificationFailure:
recovering = True
raise NotImplementedError raise NotImplementedError
def playSecondaryRole(self): def playSecondaryRole(self):
logging.info('play the secondary role') logging.info('play the secondary role')
raise NotImplementedError raise NotImplementedError
def getNextPartitionTableID(self):
if self.lptid is None:
raise RuntimeError, 'I do not know the last Partition Table ID'
l = []
append = l.append
for c in self.lptid:
append(c)
for i in xrange(7, -1, -1):
d = ord(l[i])
if d == 255:
l[i] = chr(0)
else:
l[i] = chr(d + 1)
break
else:
raise RuntimeError, 'Partition Table ID overflowed'
self.lptid = ''.join(l)
return self.lptid
...@@ -50,10 +50,6 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -50,10 +50,6 @@ class RecoveryEventHandler(MasterEventHandler):
MasterEventHandler.peerBroken(self, conn) MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() in (TEMPORARILY_DOWN_STATE, DOWN_STATE):
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.packetReceived(self, conn, packet) MasterEventHandler.packetReceived(self, conn, packet)
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
...@@ -71,18 +67,83 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -71,18 +67,83 @@ class RecoveryEventHandler(MasterEventHandler):
conn.abort() conn.abort()
return return
# Here are many situations. In principle, a node should be identified by
# an UUID, since an UUID never change when moving a storage node to a different
# server, and an UUID always changes for a master node and a client node whenever
# it restarts, so more reliable than a server address.
#
# However, master nodes can be known only as the server addresses. And, a node
# may claim a server address used by another node.
addr = (ip_address, port) addr = (ip_address, port)
# First, get the node by the UUID.
node = app.nm.getNodeByUUID(uuid)
if node is None:
# If nothing is present, try with the server address.
node = app.nm.getNodeByServer(addr) node = app.nm.getNodeByServer(addr)
if node is None: if node is None:
# Nothing is found. So this must be the first time that this node
# connected to me.
if node_type == MASTER_NODE_TYPE: if node_type == MASTER_NODE_TYPE:
node = MasterNode(server = addr, uuid = uuid) node = MasterNode(server = addr, uuid = uuid)
else: else:
node = StorageNode(server = address, uuid = uuid) node = StorageNode(server = address, uuid = uuid)
app.nm.add(node) app.nm.add(node)
app.broadcastNodeInformation(node)
else:
# Otherwise, I know it only by the server address or the same server
# address but with a different UUID.
if node.getUUID() is None:
# This must be a master node.
if not isinstance(node, MasterNode) or node_type != MASTER_NODE_TYPE:
# Error. This node uses the same server address as a master
# node.
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid server address'))
conn.abort()
return
node.setUUID(uuid)
if node.getState() != RUNNING_STATE:
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# This node has a different UUID.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid server address'))
conn.abort()
return
else:
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setUUID(uuid)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else:
# I know this node by the UUID.
if node.getServer() != addr:
# This node has a different server address.
if node.getState() == RUNNING_STATE:
# If it is still running, reject this node.
conn.addPacket(Packet().protocolError(packet.getId(),
'invalid server address'))
conn.abort()
return
else:
# Otherwise, forget the old one.
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
# And insert a new one.
node.setServer(addr)
node.setState(RUNNING_STATE)
app.broadcastNodeInformation(node)
else: else:
# If this node is broken, reject it. # If this node is broken, reject it. Otherwise, assume that it is
# working again.
if node.getState() == BROKEN_STATE: if node.getState() == BROKEN_STATE:
if node.getUUID() == uuid:
p = Packet() p = Packet()
p.brokenNodeDisallowedError(packet.getId(), 'go away') p.brokenNodeDisallowedError(packet.getId(), 'go away')
conn.addPacket(p) conn.addPacket(p)
......
...@@ -63,6 +63,9 @@ class PartitionTable(object): ...@@ -63,6 +63,9 @@ class PartitionTable(object):
self.num_filled_rows = self.np self.num_filled_rows = self.np
def setCell(self, offset, node, state): def setCell(self, offset, node, state):
if node.getState() in (BROKEN_STATE, DOWN_STATE):
return
row = self.partition_list[offset] row = self.partition_list[offset]
if row is None: if row is None:
# Create a new row. # Create a new row.
...@@ -101,3 +104,16 @@ class PartitionTable(object): ...@@ -101,3 +104,16 @@ class PartitionTable(object):
return False return False
return True return True
def dropNode(self, node):
cell_list = []
uuid = node.getUUID()
for offset, row in enumerate(self.partition_list):
if row is not None:
for cell in row:
if cell.getNode() == node:
row.remove(cell)
cell_list.append((offset, uuid, DISCARDED_STATE))
break
return cell_list
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment