Commit 82c18e80 authored by Grégory Wisniewski's avatar Grégory Wisniewski

On the way of the previous commit, handleConnectionLost is now defined and

called from the generic handler. This allow use it in some part of client and
storage handlers.
Add some XXX...


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@1091 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 14eafdd4
...@@ -45,7 +45,8 @@ class BaseHandler(EventHandler): ...@@ -45,7 +45,8 @@ class BaseHandler(EventHandler):
else: else:
self.dispatch(conn, packet) self.dispatch(conn, packet)
def _notifyQueues(self, conn):
def handleConnectionLost(self, conn, new_state):
""" """
Put fake packets to task queues so that threads waiting for an Put fake packets to task queues so that threads waiting for an
answer get notified of the disconnection. answer get notified of the disconnection.
...@@ -60,22 +61,16 @@ class BaseHandler(EventHandler): ...@@ -60,22 +61,16 @@ class BaseHandler(EventHandler):
for queue in queue_set: for queue in queue_set:
queue.put((conn, None)) queue.put((conn, None))
def connectionClosed(self, conn):
super(BaseHandler, self).connectionClosed(conn)
self._notifyQueues(conn)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
super(BaseHandler, self).timeoutExpired(conn) super(BaseHandler, self).timeoutExpired(conn)
# XXX: in event.py, the connection is closed after trigger this event,
# so this should not be closed here, but the lock must remains...
conn.lock() conn.lock()
try: try:
conn.close() conn.close()
finally: finally:
conn.release() conn.release()
self._notifyQueues(conn)
def connectionFailed(self, conn):
super(BaseHandler, self).connectionFailed(conn)
self._notifyQueues(conn)
def unexpectedInAnswerHandler(*args, **kw): def unexpectedInAnswerHandler(*args, **kw):
raise Exception('Unexpected event in an answer handler') raise Exception('Unexpected event in an answer handler')
......
...@@ -22,6 +22,7 @@ from neo.protocol import PacketMalformedError, UnexpectedPacketError, \ ...@@ -22,6 +22,7 @@ from neo.protocol import PacketMalformedError, UnexpectedPacketError, \
BrokenNodeDisallowedError, NotReadyError, ProtocolError BrokenNodeDisallowedError, NotReadyError, ProtocolError
from neo.connection import ServerConnection from neo.connection import ServerConnection
from neo import protocol
from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \ from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \ ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \ REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
...@@ -54,6 +55,8 @@ class EventHandler(object): ...@@ -54,6 +55,8 @@ class EventHandler(object):
self.packet_dispatch_table = self.initPacketDispatchTable() self.packet_dispatch_table = self.initPacketDispatchTable()
self.error_dispatch_table = self.initErrorDispatchTable() self.error_dispatch_table = self.initErrorDispatchTable()
# XXX: there is an inconsistency between connection* and handle* names. As
# we are in an hander, I think that's redondant to prefix with 'handle'
def connectionStarted(self, conn): def connectionStarted(self, conn):
"""Called when a connection is started.""" """Called when a connection is started."""
logging.debug('connection started for %s:%d', *(conn.getAddress())) logging.debug('connection started for %s:%d', *(conn.getAddress()))
...@@ -74,21 +77,29 @@ class EventHandler(object): ...@@ -74,21 +77,29 @@ class EventHandler(object):
# A request for a node identification should arrive. # A request for a node identification should arrive.
new_conn.expectMessage(timeout = 10, additional_timeout = 0) new_conn.expectMessage(timeout = 10, additional_timeout = 0)
def handleConnectionLost(self, conn, new_state):
""" this is a method to override in sub-handlers when there is no need
to make distinction from the kind event that closed the connection """
pass
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
"""Called when a timeout event occurs.""" """Called when a timeout event occurs."""
logging.debug('timeout expired for %s:%d', *(conn.getAddress())) logging.debug('timeout expired for %s:%d', *(conn.getAddress()))
self.handleConnectionLost(conn, protocol.TEMPORARILY_DOWN_STATE)
def connectionClosed(self, conn): def connectionClosed(self, conn):
"""Called when a connection is closed by the peer.""" """Called when a connection is closed by the peer."""
logging.debug('connection closed for %s:%d', *(conn.getAddress())) logging.debug('connection closed for %s:%d', *(conn.getAddress()))
self.handleConnectionLost(conn, protocol.TEMPORARILY_DOWN_STATE)
def packetReceived(self, conn, packet):
"""Called when a packet is received."""
self.dispatch(conn, packet)
def peerBroken(self, conn): def peerBroken(self, conn):
"""Called when a peer is broken.""" """Called when a peer is broken."""
logging.error('%s:%d is broken', *(conn.getAddress())) logging.error('%s:%d is broken', *(conn.getAddress()))
self.handleConnectionLost(conn, protocol.BROKEN_STATE)
def packetReceived(self, conn, packet):
"""Called when a packet is received."""
self.dispatch(conn, packet)
def packetMalformed(self, conn, packet, message='', *args): def packetMalformed(self, conn, packet, message='', *args):
"""Called when a packet is malformed.""" """Called when a packet is malformed."""
......
...@@ -23,21 +23,6 @@ from neo.handler import EventHandler ...@@ -23,21 +23,6 @@ from neo.handler import EventHandler
class MasterHandler(EventHandler): class MasterHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
# XXX: this may be defined/done in the generic handler
def _handleConnectionLost(self, conn, node, new_state):
# override this method in sub-handlers to do specific actions when a
# node is lost
pass
def connectionClosed(self, conn):
self._handleConnectionLost(conn, protocol.TEMPORARILY_DOWN_STATE)
def timeoutExpired(self, conn):
self._handleConnectionLost(conn, protocol.TEMPORARILY_DOWN_STATE)
def peerBroken(self, conn):
self._handleConnectionLost(conn, protocol.BROKEN_STATE)
def handleProtocolError(self, conn, packet, message): def handleProtocolError(self, conn, packet, message):
logging.error('Protocol error %s %s' % (message, conn.getAddress())) logging.error('Protocol error %s %s' % (message, conn.getAddress()))
...@@ -91,7 +76,7 @@ class BaseServiceHandler(MasterHandler): ...@@ -91,7 +76,7 @@ class BaseServiceHandler(MasterHandler):
# It is triggered when a connection to a node gets lost. # It is triggered when a connection to a node gets lost.
pass pass
def _handleConnectionLost(self, conn, new_state): def handleConnectionLost(self, conn, new_state):
node = self.app.nm.getNodeByUUID(conn.getUUID()) node = self.app.nm.getNodeByUUID(conn.getUUID())
if node is None or node.getState() == new_state: if node is None or node.getState() == new_state:
return return
......
...@@ -32,14 +32,8 @@ class BaseStorageHandler(EventHandler): ...@@ -32,14 +32,8 @@ class BaseStorageHandler(EventHandler):
class BaseMasterHandler(BaseStorageHandler): class BaseMasterHandler(BaseStorageHandler):
def timeoutExpired(self, conn): def handleConnectionLost(self, conn, new_state):
raise PrimaryFailure('times out') raise PrimaryFailure('connection lost')
def connectionClosed(self, conn):
raise PrimaryFailure('dead')
def peerBroken(self, conn):
raise PrimaryFailure('broken')
def handleReelectPrimaryMaster(self, conn, packet): def handleReelectPrimaryMaster(self, conn, packet):
raise PrimaryFailure('re-election occurs') raise PrimaryFailure('re-election occurs')
......
...@@ -25,14 +25,8 @@ from neo.node import ClientNode ...@@ -25,14 +25,8 @@ from neo.node import ClientNode
class IdentificationHandler(BaseStorageHandler): class IdentificationHandler(BaseStorageHandler):
""" Handler used for incoming connections during operation state """ """ Handler used for incoming connections during operation state """
def connectionClosed(self, conn): def handleConnectionLost(self, conn, new_state):
logging.warning('lost a node in IdentificationEventHandler') logging.warning('A connection was lost during identification')
def timeoutExpired(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def peerBroken(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, address, name): uuid, address, name):
......
...@@ -28,20 +28,12 @@ class ReplicationHandler(BaseStorageHandler): ...@@ -28,20 +28,12 @@ class ReplicationHandler(BaseStorageHandler):
# Nothing to do. # Nothing to do.
pass pass
def connectionFailed(self, conn): def handleConnectionLost(self, conn, new_state):
logging.error('replication is stopped due to connection failure') logging.error('replication is stopped due to a connection lost')
self.app.replicator.reset()
def timeoutExpired(self, conn):
logging.error('replication is stopped due to timeout')
self.app.replicator.reset() self.app.replicator.reset()
def connectionClosed(self, conn): def connectionFailed(self, conn):
logging.error('replication is stopped due to close') logging.error('replication is stopped due to connection failure')
self.app.replicator.reset()
def peerBroken(self, conn):
logging.error('replication is stopped due to breakage')
self.app.replicator.reset() self.app.replicator.reset()
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
......
...@@ -20,9 +20,6 @@ from neo.storage.handlers import BaseClientAndStorageOperationHandler ...@@ -20,9 +20,6 @@ from neo.storage.handlers import BaseClientAndStorageOperationHandler
class StorageOperationHandler(BaseClientAndStorageOperationHandler): class StorageOperationHandler(BaseClientAndStorageOperationHandler):
def connectionCompleted(self, conn):
BaseClientAndStorageOperationHandler.connectionCompleted(self, conn)
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
app = self.app app = self.app
oid = app.dm.getLastOID() oid = app.dm.getLastOID()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment