Commit 2fba3c61 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Unify handling of connections closed/timeout/broken in the master base handler,

that update the node manager to ensure it's no more known as RUNNING, that
caused 'invalid server address' errors when a node restart.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@825 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 59f8284b
...@@ -26,12 +26,6 @@ from neo.util import dump ...@@ -26,12 +26,6 @@ from neo.util import dump
class AdministrationEventHandler(MasterEventHandler): class AdministrationEventHandler(MasterEventHandler):
"""This class deals with messages from the admin node only""" """This class deals with messages from the admin node only"""
def _discardNode(self, conn):
uuid = conn.getUUID()
node = self.app.nm.getNodeByUUID(uuid)
if node is not None:
self.app.nm.remove(node)
def handleAskPrimaryMaster(self, conn, packet): def handleAskPrimaryMaster(self, conn, packet):
app = self.app app = self.app
# I'm the primary # I'm the primary
...@@ -40,18 +34,6 @@ class AdministrationEventHandler(MasterEventHandler): ...@@ -40,18 +34,6 @@ class AdministrationEventHandler(MasterEventHandler):
app.sendNodesInformations(conn) app.sendNodesInformations(conn)
app.sendPartitionTable(conn) app.sendPartitionTable(conn)
def connectionClosed(self, conn):
self._discardNode(conn)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
self._discardNode(conn)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
self._discardNode(conn)
MasterEventHandler.peerBroken(self, conn)
def handleSetClusterState(self, conn, packet, name, state): def handleSetClusterState(self, conn, packet, name, state):
self.checkClusterName(name) self.checkClusterName(name)
self.app.changeClusterState(state) self.app.changeClusterState(state)
......
...@@ -19,11 +19,34 @@ import logging ...@@ -19,11 +19,34 @@ import logging
from neo import protocol from neo import protocol
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.protocol import INVALID_UUID, BROKEN_STATE
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
def _nodeLost(self, conn, node):
# override this method in sub-handlers to do specific actions when a
# node is lost
pass
def _dropIt(self, conn, node, new_state):
if node is None or node.getState() == new_state:
return
node.setState(new_state)
self.app.broadcastNodeInformation(node)
self._nodeLost(conn, node)
def connectionClosed(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
self._dropIt(conn, node, protocol.TEMPORARILY_DOWN_STATE)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
self._dropIt(conn, node, protocol.TEMPORARILY_DOWN_STATE)
def peerBroken(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
self._dropIt(conn, node, protocol.BROKEN_STATE)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
logging.error('ignoring Notify Node Information in %s', self.__class__.__name__) logging.error('ignoring Notify Node Information in %s', self.__class__.__name__)
...@@ -89,14 +112,14 @@ class MasterEventHandler(EventHandler): ...@@ -89,14 +112,14 @@ class MasterEventHandler(EventHandler):
elif app.primary_master_node is not None: elif app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID() primary_uuid = app.primary_master_node.getUUID()
else: else:
primary_uuid = INVALID_UUID primary_uuid = protocol.INVALID_UUID
known_master_list = [app.server + (app.uuid, )] known_master_list = [app.server + (app.uuid, )]
for n in app.nm.getMasterNodeList(): for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE: if n.getState() == protocol.BROKEN_STATE:
continue continue
known_master_list.append(n.getServer() + \ known_master_list.append(n.getServer() + \
(n.getUUID() or INVALID_UUID, )) (n.getUUID() or protocol.INVALID_UUID, ))
conn.answer(protocol.answerPrimaryMaster(primary_uuid, conn.answer(protocol.answerPrimaryMaster(primary_uuid,
known_master_list), packet) known_master_list), packet)
......
...@@ -23,14 +23,8 @@ from neo.master.handler import MasterEventHandler ...@@ -23,14 +23,8 @@ from neo.master.handler import MasterEventHandler
class IdentificationEventHandler(MasterEventHandler): class IdentificationEventHandler(MasterEventHandler):
"""This class deals with messages from the admin node only""" """This class deals with messages from the admin node only"""
def connectionClosed(self, conn): def _nodeLost(self, conn, node):
logging.warning('lost a node in IdentificationEventHandler') logging.warning('lost a node in IdentificationEventHandler : %s' % node)
def timeoutExpired(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def peerBroken(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
......
...@@ -32,33 +32,6 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -32,33 +32,6 @@ class RecoveryEventHandler(MasterEventHandler):
# ask the last IDs to perform the recovery # ask the last IDs to perform the recovery
conn.ask(protocol.askLastIDs()) conn.ask(protocol.askLastIDs())
def connectionClosed(self, conn):
app = self.app
uuid = conn.getUUID()
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
app = self.app
uuid = conn.getUUID()
node = app.nm.getNodeByUUID(uuid)
if node.getState() == RUNNING_STATE:
node.setState(TEMPORARILY_DOWN_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
app = self.app
uuid = conn.getUUID()
node = app.nm.getNodeByUUID(uuid)
if node.getState() != BROKEN_STATE:
node.setState(BROKEN_STATE)
app.broadcastNodeInformation(node)
MasterEventHandler.peerBroken(self, conn)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
......
...@@ -43,23 +43,10 @@ class SecondaryMasterEventHandler(MasterEventHandler): ...@@ -43,23 +43,10 @@ class SecondaryMasterEventHandler(MasterEventHandler):
class PrimaryMasterEventHandler(MasterEventHandler): class PrimaryMasterEventHandler(MasterEventHandler):
""" Handler used by secondaries to handle primary master""" """ Handler used by secondaries to handle primary master"""
def connectionClosed(self, conn): def _nodeLost(self, conn, node):
if not conn.isServerConnection(): # XXX: why in down state ?
self.app.primary_master_node.setState(DOWN_STATE) self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is dead' raise PrimaryFailure, 'primary master is dead'
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
if not conn.isServerConnection():
self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is down'
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
if not conn.isServerConnection():
self.app.primary_master_node.setState(DOWN_STATE)
raise PrimaryFailure, 'primary master is crazy'
MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
if not conn.isServerConnection(): if not conn.isServerConnection():
......
...@@ -31,27 +31,6 @@ from neo.util import dump ...@@ -31,27 +31,6 @@ from neo.util import dump
class ServiceEventHandler(MasterEventHandler): class ServiceEventHandler(MasterEventHandler):
"""This class deals with events for a service phase.""" """This class deals with events for a service phase."""
def _dropIt(self, conn, node, new_state):
raise RuntimeError('rhis method must be overriden')
def connectionClosed(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node is not None and node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() != BROKEN_STATE:
self._dropIt(conn, node, BROKEN_STATE)
MasterEventHandler.peerBroken(self, conn)
def handleNotifyNodeInformation(self, conn, packet, node_list): def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app app = self.app
for node_type, ip_address, port, uuid, state in node_list: for node_type, ip_address, port, uuid, state in node_list:
...@@ -161,11 +140,8 @@ class ClientServiceEventHandler(ServiceEventHandler): ...@@ -161,11 +140,8 @@ class ClientServiceEventHandler(ServiceEventHandler):
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
pass pass
def _dropIt(self, conn, node, new_state): def _nodeLost(self, conn, node):
app = self.app app = self.app
node.setState(new_state)
app.broadcastNodeInformation(node)
app.nm.remove(node)
for tid, t in app.finishing_transaction_dict.items(): for tid, t in app.finishing_transaction_dict.items():
if t.getConnection() is conn: if t.getConnection() is conn:
del app.finishing_transaction_dict[tid] del app.finishing_transaction_dict[tid]
...@@ -234,28 +210,10 @@ class StorageServiceEventHandler(ServiceEventHandler): ...@@ -234,28 +210,10 @@ class StorageServiceEventHandler(ServiceEventHandler):
if node.getState() == RUNNING_STATE: if node.getState() == RUNNING_STATE:
conn.notify(protocol.startOperation()) conn.notify(protocol.startOperation())
def _dropIt(self, conn, node, new_state): def _nodeLost(self, conn, node):
app = self.app if not self.app.pt.operational():
node.setState(new_state)
app.broadcastNodeInformation(node)
if not app.pt.operational():
raise OperationFailure, 'cannot continue operation' raise OperationFailure, 'cannot continue operation'
def connectionClosed(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() == RUNNING_STATE:
self._dropIt(conn, node, TEMPORARILY_DOWN_STATE)
def peerBroken(self, conn):
node = self.app.nm.getNodeByUUID(conn.getUUID())
if node.getState() != BROKEN_STATE:
self._dropIt(conn, node, BROKEN_STATE)
def handleNotifyInformationLocked(self, conn, packet, tid): def handleNotifyInformationLocked(self, conn, packet, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
app = self.app app = self.app
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment