Commit 4b6c1387 authored by Julien Muchembled's avatar Julien Muchembled

connection: make close always call handler (connectionClosed or connectionFailed)

Main reason is that it's difficult to know in advance which side really closes
the connection. Network events can be chaotic and this could lead to many race
conditions.
Thus, handler can be used to update any database that is somewhat redundant
to the connection status, i.e. node status usually. Safely and less duplicated
code.

This change is motivated by recurrent random failures during election.
An example of race condition was that 2 fully connected master could close the
extra connection (the primary -> secondary one) at the same time.

In order to stabilize lower-level code and start with reliable election process,
code has also been simplified to not care about node states. All connections
without exception are closed at the end of the election and states are then
updated 1 by 1 by identification handler.
Note that during election, there may be 2 connection per node, which makes
difficult to update node states by connectionFailed/connectionClosed events.

timeoutExpired & peerBroken are dropped as they are unused for the moment.
A new API should be designed so that connectionClosed know the reason of the
close.
BROKEN state becomes unused.

git-svn-id: https://svn.erp5.org/repos/neo/trunk@2732 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 35f0a5bf
...@@ -122,15 +122,9 @@ class MasterEventHandler(EventHandler): ...@@ -122,15 +122,9 @@ class MasterEventHandler(EventHandler):
def connectionFailed(self, conn): def connectionFailed(self, conn):
self._connectionLost(conn) self._connectionLost(conn)
def timeoutExpired(self, conn):
self._connectionLost(conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
self._connectionLost(conn) self._connectionLost(conn)
def peerBroken(self, conn):
self._connectionLost(conn)
def dispatch(self, conn, packet): def dispatch(self, conn, packet):
if packet.isResponse() and \ if packet.isResponse() and \
self.app.dispatcher.registered(packet.getId()): self.app.dispatcher.registered(packet.getId()):
......
...@@ -68,10 +68,10 @@ class PrimaryBootstrapHandler(AnswerBaseHandler): ...@@ -68,10 +68,10 @@ class PrimaryBootstrapHandler(AnswerBaseHandler):
neo.lib.logging.warning('Unknown primary master UUID: %s. ' \ neo.lib.logging.warning('Unknown primary master UUID: %s. ' \
'Ignoring.' % dump(primary_uuid)) 'Ignoring.' % dump(primary_uuid))
else: else:
app.primary_master_node = primary_node
if app.trying_master_node is not primary_node: if app.trying_master_node is not primary_node:
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
app.primary_master_node = primary_node
else: else:
if app.primary_master_node is not None: if app.primary_master_node is not None:
# The primary master node is not a primary master node # The primary master node is not a primary master node
...@@ -93,27 +93,14 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -93,27 +93,14 @@ class PrimaryNotificationsHandler(BaseHandler):
def connectionClosed(self, conn): def connectionClosed(self, conn):
app = self.app app = self.app
if app.master_conn is not None:
neo.lib.logging.critical("connection to primary master node closed") neo.lib.logging.critical("connection to primary master node closed")
conn.close()
app.master_conn = None app.master_conn = None
app.primary_master_node = None app.primary_master_node = None
else:
assert app.primary_master_node is None
super(PrimaryNotificationsHandler, self).connectionClosed(conn) super(PrimaryNotificationsHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
neo.lib.logging.critical(
"connection timeout to primary master node expired")
BaseHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
app = self.app
if app.master_conn is not None:
assert conn is app.master_conn
neo.lib.logging.critical("primary master node is broken")
BaseHandler.peerBroken(self, conn)
def stopOperation(self, conn): def stopOperation(self, conn):
neo.lib.logging.critical("master node ask to stop operation") neo.lib.logging.critical("master node ask to stop operation")
...@@ -139,19 +126,14 @@ class PrimaryNotificationsHandler(BaseHandler): ...@@ -139,19 +126,14 @@ class PrimaryNotificationsHandler(BaseHandler):
self.app.pt.update(ptid, cell_list, self.app.nm) self.app.pt.update(ptid, cell_list, self.app.nm)
def notifyNodeInformation(self, conn, node_list): def notifyNodeInformation(self, conn, node_list):
app = self.app nm = self.app.nm
self.app.nm.update(node_list) nm.update(node_list)
for node_type, addr, uuid, state in node_list: for node_type, addr, uuid, state in node_list:
if state != NodeStates.RUNNING: if state != NodeStates.RUNNING:
# close connection to this node if no longer running # close connection to this node if no longer running
node = self.app.nm.getByUUID(uuid) node = nm.getByUUID(uuid)
if node and node.isConnected(): if node and node.isConnected():
conn = node.getConnection() node.getConnection().close()
conn.close()
if node_type == NodeTypes.STORAGE:
# Remove from pool connection
app.cp.removeConnection(conn)
self.dispatcher.unregister(conn)
class PrimaryAnswersHandler(AnswerBaseHandler): class PrimaryAnswersHandler(AnswerBaseHandler):
......
...@@ -77,8 +77,6 @@ class BootstrapManager(EventHandler): ...@@ -77,8 +77,6 @@ class BootstrapManager(EventHandler):
handle the client node. handle the client node.
Close connection and restart. Close connection and restart.
""" """
# master are still electing on of them
self.current = None
conn.close() conn.close()
def answerPrimary(self, conn, primary_uuid, known_master_list): def answerPrimary(self, conn, primary_uuid, known_master_list):
...@@ -102,7 +100,6 @@ class BootstrapManager(EventHandler): ...@@ -102,7 +100,6 @@ class BootstrapManager(EventHandler):
# - something goes wrong (unknown UUID) # - something goes wrong (unknown UUID)
# - this master doesn't know who's the primary # - this master doesn't know who's the primary
# - got the primary's uuid, so cut here # - got the primary's uuid, so cut here
self.current = None
conn.close() conn.close()
return return
......
...@@ -184,7 +184,7 @@ class HandlerSwitcher(object): ...@@ -184,7 +184,7 @@ class HandlerSwitcher(object):
notification = Packets.Notify('Unexpected answer: %r' % packet) notification = Packets.Notify('Unexpected answer: %r' % packet)
connection.notify(notification) connection.notify(notification)
connection.abort() connection.abort()
handler.peerBroken(connection) # handler.peerBroken(connection)
# apply a pending handler if no more answers are pending # apply a pending handler if no more answers are pending
while len(self._pending) > 1 and not self._pending[0][0]: while len(self._pending) > 1 and not self._pending[0][0]:
del self._pending[0] del self._pending[0]
...@@ -290,13 +290,11 @@ class BaseConnection(object): ...@@ -290,13 +290,11 @@ class BaseConnection(object):
neo.lib.logging.info( neo.lib.logging.info(
'timeout for #0x%08x with %r', msg_id, self) 'timeout for #0x%08x with %r', msg_id, self)
self.close() self.close()
self.getHandler().timeoutExpired(self)
elif self._timeout.hardExpired(t): elif self._timeout.hardExpired(t):
# critical time reach or pong not received, abort # critical time reach or pong not received, abort
neo.lib.logging.info('timeout with %r', self) neo.lib.logging.info('timeout with %r', self)
self.notify(Packets.Notify('Timeout')) self.notify(Packets.Notify('Timeout'))
self.abort() self.abort()
self.getHandler().timeoutExpired(self)
elif self._timeout.softExpired(t): elif self._timeout.softExpired(t):
self._timeout.ping(t) self._timeout.ping(t)
self.ping() self.ping()
...@@ -421,6 +419,8 @@ class ListeningConnection(BaseConnection): ...@@ -421,6 +419,8 @@ class ListeningConnection(BaseConnection):
class Connection(BaseConnection): class Connection(BaseConnection):
"""A connection.""" """A connection."""
connecting = False
def __init__(self, event_manager, handler, connector, addr=None): def __init__(self, event_manager, handler, connector, addr=None):
BaseConnection.__init__(self, event_manager, handler, BaseConnection.__init__(self, event_manager, handler,
connector=connector, addr=addr) connector=connector, addr=addr)
...@@ -460,16 +460,6 @@ class Connection(BaseConnection): ...@@ -460,16 +460,6 @@ class Connection(BaseConnection):
self.cur_id = (next_id + 1) & 0xffffffff self.cur_id = (next_id + 1) & 0xffffffff
return next_id return next_id
def close(self):
neo.lib.logging.debug('closing a connector for %r', self)
BaseConnection.close(self)
if self._on_close is not None:
self._on_close()
self._on_close = None
del self.write_buf[:]
self.read_buf.clear()
self._handlers.clear()
def abort(self): def abort(self):
"""Abort dealing with this connection.""" """Abort dealing with this connection."""
neo.lib.logging.debug('aborting a connector for %r', self) neo.lib.logging.debug('aborting a connector for %r', self)
...@@ -534,18 +524,33 @@ class Connection(BaseConnection): ...@@ -534,18 +524,33 @@ class Connection(BaseConnection):
def pending(self): def pending(self):
return self.connector is not None and self.write_buf return self.connector is not None and self.write_buf
def _closure(self, was_connected=True): def close(self):
assert self.connector is not None, self.whoSetConnector() if self.connector is None:
assert self._on_close is None
assert not self.read_buf
assert not self.write_buf
assert not self.isPending()
return
# process the network events with the last registered handler to # process the network events with the last registered handler to
# solve issues where a node is lost with pending handlers and # solve issues where a node is lost with pending handlers and
# create unexpected side effects. # create unexpected side effects.
# XXX: This solution is being tested and should be approved or reverted neo.lib.logging.debug('closing a connector for %r', self)
handler = self._handlers.getLastHandler() handler = self._handlers.getLastHandler()
self.close() super(Connection, self).close()
if was_connected: if self._on_close is not None:
handler.connectionClosed(self) self._on_close()
else: self._on_close = None
del self.write_buf[:]
self.read_buf.clear()
self._handlers.clear()
if self.connecting:
handler.connectionFailed(self) handler.connectionFailed(self)
else:
handler.connectionClosed(self)
def _closure(self):
assert self.connector is not None, self.whoSetConnector()
self.close()
@profiler_decorator @profiler_decorator
def _recv(self): def _recv(self):
...@@ -555,8 +560,8 @@ class Connection(BaseConnection): ...@@ -555,8 +560,8 @@ class Connection(BaseConnection):
except ConnectorTryAgainException: except ConnectorTryAgainException:
pass pass
except ConnectorConnectionRefusedException: except ConnectorConnectionRefusedException:
# should only occur while connecting assert self.connecting
self._closure(was_connected=False) self._closure()
except ConnectorConnectionClosedException: except ConnectorConnectionClosedException:
# connection resetted by peer, according to the man, this error # connection resetted by peer, according to the man, this error
# should not occurs but it seems it's false # should not occurs but it seems it's false
...@@ -667,8 +672,9 @@ class Connection(BaseConnection): ...@@ -667,8 +672,9 @@ class Connection(BaseConnection):
class ClientConnection(Connection): class ClientConnection(Connection):
"""A connection from this node to a remote node.""" """A connection from this node to a remote node."""
connecting = True
def __init__(self, event_manager, handler, addr, connector, **kw): def __init__(self, event_manager, handler, addr, connector, **kw):
self.connecting = True
Connection.__init__(self, event_manager, handler, addr=addr, Connection.__init__(self, event_manager, handler, addr=addr,
connector=connector) connector=connector)
handler.connectionStarted(self) handler.connectionStarted(self)
...@@ -681,10 +687,10 @@ class ClientConnection(Connection): ...@@ -681,10 +687,10 @@ class ClientConnection(Connection):
self.connecting = False self.connecting = False
self.getHandler().connectionCompleted(self) self.getHandler().connectionCompleted(self)
except ConnectorConnectionRefusedException: except ConnectorConnectionRefusedException:
self._closure(was_connected=False) self._closure()
except ConnectorException: except ConnectorException:
# unhandled connector exception # unhandled connector exception
self._closure(was_connected=False) self._closure()
raise raise
def writable(self): def writable(self):
...@@ -692,7 +698,7 @@ class ClientConnection(Connection): ...@@ -692,7 +698,7 @@ class ClientConnection(Connection):
if self.connecting: if self.connecting:
err = self.connector.getError() err = self.connector.getError()
if err: if err:
self._closure(was_connected=False) self._closure()
return return
else: else:
self.connecting = False self.connecting = False
......
...@@ -41,7 +41,7 @@ class EventHandler(object): ...@@ -41,7 +41,7 @@ class EventHandler(object):
neo.lib.logging.error(message) neo.lib.logging.error(message)
conn.answer(Errors.ProtocolError(message)) conn.answer(Errors.ProtocolError(message))
conn.abort() conn.abort()
self.peerBroken(conn) # self.peerBroken(conn)
def dispatch(self, conn, packet): def dispatch(self, conn, packet):
"""This is a helper method to handle various packet types.""" """This is a helper method to handle various packet types."""
...@@ -59,23 +59,20 @@ class EventHandler(object): ...@@ -59,23 +59,20 @@ class EventHandler(object):
neo.lib.logging.error('malformed packet from %r', conn) neo.lib.logging.error('malformed packet from %r', conn)
conn.notify(Packets.Notify('Malformed packet: %r' % (packet, ))) conn.notify(Packets.Notify('Malformed packet: %r' % (packet, )))
conn.abort() conn.abort()
self.peerBroken(conn) # self.peerBroken(conn)
except BrokenNodeDisallowedError: except BrokenNodeDisallowedError:
conn.answer(Errors.BrokenNode('go away')) conn.answer(Errors.BrokenNode('go away'))
conn.abort() conn.abort()
self.connectionClosed(conn)
except NotReadyError, message: except NotReadyError, message:
if not message.args: if not message.args:
message = 'Retry Later' message = 'Retry Later'
message = str(message) message = str(message)
conn.answer(Errors.NotReady(message)) conn.answer(Errors.NotReady(message))
conn.abort() conn.abort()
self.connectionClosed(conn)
except ProtocolError, message: except ProtocolError, message:
message = str(message) message = str(message)
conn.answer(Errors.ProtocolError(message)) conn.answer(Errors.ProtocolError(message))
conn.abort() conn.abort()
self.connectionClosed(conn)
def checkClusterName(self, name): def checkClusterName(self, name):
# raise an exception if the given name mismatch the current cluster name # raise an exception if the given name mismatch the current cluster name
...@@ -106,20 +103,15 @@ class EventHandler(object): ...@@ -106,20 +103,15 @@ class EventHandler(object):
def connectionAccepted(self, conn): def connectionAccepted(self, conn):
"""Called when a connection is accepted.""" """Called when a connection is accepted."""
def timeoutExpired(self, conn):
"""Called when a timeout event occurs."""
neo.lib.logging.debug('timeout expired for %r', conn)
self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
def connectionClosed(self, conn): def connectionClosed(self, conn):
"""Called when a connection is closed by the peer.""" """Called when a connection is closed by the peer."""
neo.lib.logging.debug('connection closed for %r', conn) neo.lib.logging.debug('connection closed for %r', conn)
self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN) self.connectionLost(conn, NodeStates.TEMPORARILY_DOWN)
def peerBroken(self, conn): #def peerBroken(self, conn):
"""Called when a peer is broken.""" # """Called when a peer is broken."""
neo.lib.logging.error('%r is broken', conn) # neo.lib.logging.error('%r is broken', conn)
self.connectionLost(conn, NodeStates.BROKEN) # # NodeStates.BROKEN
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
""" this is a method to override in sub-handlers when there is no need """ this is a method to override in sub-handlers when there is no need
......
...@@ -334,9 +334,7 @@ class Application(object): ...@@ -334,9 +334,7 @@ class Application(object):
if node.isStorage() or node.isClient(): if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation()) node.notify(Packets.StopOperation())
if node.isClient(): if node.isClient():
conn = node.getConnection() node.getConnection().abort()
conn.abort()
conn.getHandler().connectionClosed(conn)
# Then, go back, and restart. # Then, go back, and restart.
return return
...@@ -351,18 +349,20 @@ class Application(object): ...@@ -351,18 +349,20 @@ class Application(object):
self.listening_conn.setHandler( self.listening_conn.setHandler(
identification.IdentificationHandler(self)) identification.IdentificationHandler(self))
handler = secondary.SecondaryMasterHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
# Make sure that every connection has the secondary event handler. # Close all remaining connections to other masters,
# for the same reason as in playSecondaryRole.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
conn_uuid = conn.getUUID() conn_uuid = conn.getUUID()
if conn_uuid is not None: if conn_uuid is not None:
node = nm.getByUUID(conn_uuid) node = nm.getByUUID(conn_uuid)
assert node is not None assert node is not None
assert node.isMaster() assert node.isMaster() and not conn.isClient()
conn.setHandler(handler) assert node._connection is None and node.isUnknown()
# this may trigger 'unexpected answer' warnings on remote side
conn.close()
# If I know any storage node, make sure that they are not in the # If I know any storage node, make sure that they are not in the
# running state, because they are not connected at this stage. # running state, because they are not connected at this stage.
...@@ -392,22 +392,14 @@ class Application(object): ...@@ -392,22 +392,14 @@ class Application(object):
# election timeout # election timeout
raise ElectionFailure("Election timeout") raise ElectionFailure("Election timeout")
# Now I need only a connection to the primary master node. # Restart completely. Non-optimized
# but lower level code needs to be stabilized first.
addr = self.primary_master_node.getAddress() addr = self.primary_master_node.getAddress()
for conn in self.em.getServerList(): for conn in self.em.getConnectionList():
conn.close() conn.close()
connected_to_master = False
# Reconnect to primary master node.
primary_handler = secondary.PrimaryHandler(self) primary_handler = secondary.PrimaryHandler(self)
for conn in self.em.getClientList():
if conn.getAddress() == addr:
connected_to_master = True
conn.setHandler(primary_handler)
else:
conn.close()
if not connected_to_master:
ClientConnection(self.em, primary_handler, addr=addr, ClientConnection(self.em, primary_handler, addr=addr,
connector=self.connector_handler()) connector=self.connector_handler())
...@@ -442,18 +434,20 @@ class Application(object): ...@@ -442,18 +434,20 @@ class Application(object):
# change handlers # change handlers
notification_packet = Packets.NotifyClusterInformation(state) notification_packet = Packets.NotifyClusterInformation(state)
for node in self.nm.getIdentifiedList(): for node in self.nm.getIdentifiedList():
if not node.isMaster(): if node.isMaster():
node.notify(notification_packet)
if node.isAdmin() or node.isMaster():
# those node types keep their own handler
continue continue
conn = node.getConnection() conn = node.getConnection()
if node.isClient() and conn.isAborted():
continue
node.notify(notification_packet)
if node.isClient(): if node.isClient():
if state != ClusterStates.RUNNING: if state != ClusterStates.RUNNING:
conn.close() conn.close()
handler = client_handler handler = client_handler
elif node.isStorage(): elif node.isStorage():
handler = storage_handler handler = storage_handler
else:
continue # keep handler
conn.setHandler(handler) conn.setHandler(handler)
handler.connectionCompleted(conn) handler.connectionCompleted(conn)
self.cluster_state = state self.cluster_state = state
......
...@@ -81,7 +81,8 @@ class BaseServiceHandler(MasterHandler): ...@@ -81,7 +81,8 @@ class BaseServiceHandler(MasterHandler):
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID()) node = self.app.nm.getByUUID(conn.getUUID())
assert node is not None if node is None:
return # for example, when a storage is removed by an admin
if new_state != NodeStates.BROKEN: if new_state != NodeStates.BROKEN:
new_state = DISCONNECTED_STATE_DICT.get(node.getType(), new_state = DISCONNECTED_STATE_DICT.get(node.getType(),
NodeStates.DOWN) NodeStates.DOWN)
......
...@@ -32,12 +32,6 @@ class ClientElectionHandler(MasterHandler): ...@@ -32,12 +32,6 @@ class ClientElectionHandler(MasterHandler):
def askPrimary(self, conn): def askPrimary(self, conn):
raise UnexpectedPacketError, "askPrimary on server connection" raise UnexpectedPacketError, "askPrimary on server connection"
def packetReceived(self, conn, packet):
node = self.app.nm.getByAddress(conn.getAddress())
if not node.isBroken():
node.setRunning()
MasterHandler.packetReceived(self, conn, packet)
def connectionStarted(self, conn): def connectionStarted(self, conn):
addr = conn.getAddress() addr = conn.getAddress()
# connection in progress # connection in progress
...@@ -57,18 +51,11 @@ class ClientElectionHandler(MasterHandler): ...@@ -57,18 +51,11 @@ class ClientElectionHandler(MasterHandler):
MasterHandler.connectionFailed(self, conn) MasterHandler.connectionFailed(self, conn)
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
addr = conn.getAddress()
node = self.app.nm.getByAddress(addr)
# connection successfull, set it as running
node.setRunning()
conn.ask(Packets.AskPrimary()) conn.ask(Packets.AskPrimary())
MasterHandler.connectionCompleted(self, conn) MasterHandler.connectionCompleted(self, conn)
def connectionLost(self, conn, new_state): def connectionLost(self, conn, new_state):
addr = conn.getAddress() addr = conn.getAddress()
node = self.app.nm.getByAddress(addr)
if new_state != NodeStates.BROKEN or node is not None:
node.setState(new_state)
self.app.negotiating_master_node_set.discard(addr) self.app.negotiating_master_node_set.discard(addr)
def acceptIdentification(self, conn, node_type, def acceptIdentification(self, conn, node_type,
...@@ -79,7 +66,6 @@ class ClientElectionHandler(MasterHandler): ...@@ -79,7 +66,6 @@ class ClientElectionHandler(MasterHandler):
# The peer is not a master node! # The peer is not a master node!
neo.lib.logging.error('%r is not a master node', conn) neo.lib.logging.error('%r is not a master node', conn)
app.nm.remove(node) app.nm.remove(node)
app.negotiating_master_node_set.discard(node.getAddress())
conn.close() conn.close()
return return
...@@ -172,11 +158,6 @@ class ServerElectionHandler(MasterHandler): ...@@ -172,11 +158,6 @@ class ServerElectionHandler(MasterHandler):
def reelectPrimary(self, conn): def reelectPrimary(self, conn):
raise ElectionFailure, 'reelection requested' raise ElectionFailure, 'reelection requested'
def connectionLost(self, conn, new_state):
node = self.app.nm.getByUUID(conn.getUUID())
if node is not None:
node.setState(new_state)
def requestIdentification(self, conn, node_type, def requestIdentification(self, conn, node_type,
uuid, address, name): uuid, address, name):
self.checkClusterName(name) self.checkClusterName(name)
......
...@@ -28,9 +28,6 @@ class SecondaryMasterHandler(MasterHandler): ...@@ -28,9 +28,6 @@ class SecondaryMasterHandler(MasterHandler):
node.setDown() node.setDown()
self.app.broadcastNodesInformation([node]) self.app.broadcastNodesInformation([node])
def connectionCompleted(self, conn):
pass
def announcePrimary(self, conn): def announcePrimary(self, conn):
raise ElectionFailure, 'another primary arises' raise ElectionFailure, 'another primary arises'
......
...@@ -42,14 +42,6 @@ class CommandEventHandler(EventHandler): ...@@ -42,14 +42,6 @@ class CommandEventHandler(EventHandler):
super(CommandEventHandler, self).connectionFailed(conn) super(CommandEventHandler, self).connectionFailed(conn)
self.__disconnected() self.__disconnected()
def timeoutExpired(self, conn):
super(CommandEventHandler, self).timeoutExpired(conn)
self.__disconnected()
def peerBroken(self, conn):
super(CommandEventHandler, self).peerBroken(conn)
self.__disconnected()
def ack(self, conn, msg): def ack(self, conn, msg):
self.__respond((Packets.Error, ErrorCodes.ACK, msg)) self.__respond((Packets.Error, ErrorCodes.ACK, msg))
......
...@@ -51,7 +51,11 @@ class BaseMasterHandler(EventHandler): ...@@ -51,7 +51,11 @@ class BaseMasterHandler(EventHandler):
neo.lib.logging.info("I was told I'm %s" %(state)) neo.lib.logging.info("I was told I'm %s" %(state))
if state in (NodeStates.DOWN, NodeStates.TEMPORARILY_DOWN, if state in (NodeStates.DOWN, NodeStates.TEMPORARILY_DOWN,
NodeStates.BROKEN): NodeStates.BROKEN):
try:
conn.close() conn.close()
assert False
except PrimaryFailure:
pass
erase = state == NodeStates.DOWN erase = state == NodeStates.DOWN
self.app.shutdown(erase=erase) self.app.shutdown(erase=erase)
elif state == NodeStates.HIDDEN: elif state == NodeStates.HIDDEN:
......
...@@ -210,6 +210,15 @@ class MasterNotificationsHandlerTests(MasterHandlerTests): ...@@ -210,6 +210,15 @@ class MasterNotificationsHandlerTests(MasterHandlerTests):
# connections closed # connections closed
self.checkClosed(conn1) self.checkClosed(conn1)
self.checkClosed(conn2) self.checkClosed(conn2)
return conn2
def test_notifyNodeInformation_checkUnregisterStorage(self):
# XXX: This test fails because unregistering is done
# by neo.client.handlers.storage.StorageEventHandler
# which would require a connection to storage
# with a proper handler (defined by Application).
# It can be merged with previous one as soon as it passes.
conn2 = self.test_notifyNodeInformation()
# storage removed from connection pool # storage removed from connection pool
remove_calls = self.app.cp.mockGetNamedCalls('removeConnection') remove_calls = self.app.cp.mockGetNamedCalls('removeConnection')
self.assertEqual(len(remove_calls), 1) self.assertEqual(len(remove_calls), 1)
......
...@@ -162,7 +162,7 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -162,7 +162,7 @@ class MasterClientHandlerTests(NeoUnitTestBase):
(node_list, ) = packet.decode() (node_list, ) = packet.decode()
self.assertEqual(len(node_list), 2) self.assertEqual(len(node_list), 2)
def __testWithMethod(self, method, state): def test_connectionClosed(self):
# give a client uuid which have unfinished transactions # give a client uuid which have unfinished transactions
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
port = self.client_port) port = self.client_port)
...@@ -170,22 +170,11 @@ class MasterClientHandlerTests(NeoUnitTestBase): ...@@ -170,22 +170,11 @@ class MasterClientHandlerTests(NeoUnitTestBase):
lptid = self.app.pt.getID() lptid = self.app.pt.getID()
self.assertEqual(self.app.nm.getByUUID(client_uuid).getState(), self.assertEqual(self.app.nm.getByUUID(client_uuid).getState(),
NodeStates.RUNNING) NodeStates.RUNNING)
method(conn) self.service.connectionClosed(conn)
# node must be have been remove, and no more transaction must remains # node must be have been remove, and no more transaction must remains
self.assertEqual(self.app.nm.getByUUID(client_uuid), None) self.assertEqual(self.app.nm.getByUUID(client_uuid), None)
self.assertEqual(lptid, self.app.pt.getID()) self.assertEqual(lptid, self.app.pt.getID())
def test_15_peerBroken(self):
self.__testWithMethod(self.service.peerBroken, NodeStates.BROKEN)
def test_16_timeoutExpired(self):
self.__testWithMethod(self.service.timeoutExpired,
NodeStates.TEMPORARILY_DOWN)
def test_17_connectionClosed(self):
self.__testWithMethod(self.service.connectionClosed,
NodeStates.TEMPORARILY_DOWN)
def test_askPack(self): def test_askPack(self):
self.assertEqual(self.app.packing, None) self.assertEqual(self.app.packing, None)
self.app.nm.createClient() self.app.nm.createClient()
......
...@@ -100,12 +100,11 @@ class MasterClientElectionTests(NeoUnitTestBase): ...@@ -100,12 +100,11 @@ class MasterClientElectionTests(NeoUnitTestBase):
self._checkUnconnected(node) self._checkUnconnected(node)
self.election.connectionCompleted(conn) self.election.connectionCompleted(conn)
self._checkUnconnected(node) self._checkUnconnected(node)
self.assertTrue(node.isRunning()) self.assertTrue(node.isUnknown())
self.checkAskPrimary(conn) self.checkAskPrimary(conn)
def _setNegociating(self, node): def _setNegociating(self, node):
self._checkUnconnected(node) self._checkUnconnected(node)
node.setRunning()
addr = node.getAddress() addr = node.getAddress()
self.app.negotiating_master_node_set.add(addr) self.app.negotiating_master_node_set.add(addr)
self.app.unconnected_master_node_set.discard(addr) self.app.unconnected_master_node_set.discard(addr)
...@@ -115,16 +114,7 @@ class MasterClientElectionTests(NeoUnitTestBase): ...@@ -115,16 +114,7 @@ class MasterClientElectionTests(NeoUnitTestBase):
node, conn = self.identifyToMasterNode() node, conn = self.identifyToMasterNode()
self._setNegociating(node) self._setNegociating(node)
self.election.connectionClosed(conn) self.election.connectionClosed(conn)
self.assertTrue(node.isTemporarilyDown()) self.assertTrue(node.isUnknown())
addr = node.getAddress()
self.assertFalse(addr in self.app.unconnected_master_node_set)
self.assertFalse(addr in self.app.negotiating_master_node_set)
def test_timeoutExpired(self):
node, conn = self.identifyToMasterNode()
self._setNegociating(node)
self.election.timeoutExpired(conn)
self.assertTrue(node.isTemporarilyDown())
addr = node.getAddress() addr = node.getAddress()
self.assertFalse(addr in self.app.unconnected_master_node_set) self.assertFalse(addr in self.app.unconnected_master_node_set)
self.assertFalse(addr in self.app.negotiating_master_node_set) self.assertFalse(addr in self.app.negotiating_master_node_set)
......
...@@ -68,26 +68,6 @@ class MasterRecoveryTests(NeoUnitTestBase): ...@@ -68,26 +68,6 @@ class MasterRecoveryTests(NeoUnitTestBase):
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(), self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.TEMPORARILY_DOWN) NodeStates.TEMPORARILY_DOWN)
def test_02_timeoutExpired(self):
uuid = self.identifyToMasterNode(node_type=NodeTypes.MASTER, port=self.master_port)
conn = self.getFakeConnection(uuid, self.master_address)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.RUNNING)
self.recovery.timeoutExpired(conn)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.TEMPORARILY_DOWN)
def test_03_peerBroken(self):
uuid = self.identifyToMasterNode(node_type=NodeTypes.MASTER, port=self.master_port)
conn = self.getFakeConnection(uuid, self.master_address)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.RUNNING)
self.recovery.peerBroken(conn)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.BROKEN)
def test_09_answerLastIDs(self): def test_09_answerLastIDs(self):
recovery = self.recovery recovery = self.recovery
uuid = self.identifyToMasterNode() uuid = self.identifyToMasterNode()
......
...@@ -154,7 +154,9 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -154,7 +154,9 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
max_tid, tid_list = self.checkAnswerUnfinishedTransactions(conn, decode=True) max_tid, tid_list = self.checkAnswerUnfinishedTransactions(conn, decode=True)
self.assertEqual(len(tid_list), 1) self.assertEqual(len(tid_list), 1)
def _testWithMethod(self, method, state): def test_connectionClosed(self):
method = self.service.connectionClosed
state = NodeStates.TEMPORARILY_DOWN
# define two nodes # define two nodes
node1, conn1 = self.identifyToMasterNode() node1, conn1 = self.identifyToMasterNode()
node2, conn2 = self.identifyToMasterNode() node2, conn2 = self.identifyToMasterNode()
...@@ -178,17 +180,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase): ...@@ -178,17 +180,6 @@ class MasterStorageHandlerTests(NeoUnitTestBase):
self.assertEqual(node2.getState(), state) self.assertEqual(node2.getState(), state)
self.assertEqual(lptid, self.app.pt.getID()) self.assertEqual(lptid, self.app.pt.getID())
def test_15_peerBroken(self):
self._testWithMethod(self.service.peerBroken, NodeStates.BROKEN)
def test_16_timeoutExpired(self):
self._testWithMethod(self.service.timeoutExpired,
NodeStates.TEMPORARILY_DOWN)
def test_17_connectionClosed(self):
self._testWithMethod(self.service.connectionClosed,
NodeStates.TEMPORARILY_DOWN)
def test_nodeLostAfterAskLockInformation(self): def test_nodeLostAfterAskLockInformation(self):
# 2 storage nodes, one will die # 2 storage nodes, one will die
node1, conn1 = self._getStorage() node1, conn1 = self._getStorage()
......
...@@ -72,26 +72,6 @@ class MasterVerificationTests(NeoUnitTestBase): ...@@ -72,26 +72,6 @@ class MasterVerificationTests(NeoUnitTestBase):
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(), self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.TEMPORARILY_DOWN) NodeStates.TEMPORARILY_DOWN)
def test_02_timeoutExpired(self):
# test a storage, must raise as cluster no longer op
uuid = self.identifyToMasterNode()
conn = self.getFakeConnection(uuid, self.storage_address)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.UNKNOWN)
self.assertRaises(VerificationFailure, self.verification.connectionClosed,conn)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.TEMPORARILY_DOWN)
def test_03_peerBroken(self):
# test a storage, must raise as cluster no longer op
uuid = self.identifyToMasterNode()
conn = self.getFakeConnection(uuid, self.storage_address)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.UNKNOWN)
self.assertRaises(VerificationFailure, self.verification.connectionClosed,conn)
self.assertEqual(self.app.nm.getByAddress(conn.getAddress()).getState(),
NodeStates.TEMPORARILY_DOWN)
def _test_09_answerLastIDs(self): def _test_09_answerLastIDs(self):
# XXX: test disabled, should be an unexpected packet # XXX: test disabled, should be an unexpected packet
verification = self.verification verification = self.verification
......
...@@ -51,24 +51,12 @@ class StorageInitializationHandlerTests(NeoUnitTestBase): ...@@ -51,24 +51,12 @@ class StorageInitializationHandlerTests(NeoUnitTestBase):
address = ("127.0.0.1", self.client_port) address = ("127.0.0.1", self.client_port)
return self.getFakeConnection(uuid=self.getNewUUID(), address=address) return self.getFakeConnection(uuid=self.getNewUUID(), address=address)
def test_02_timeoutExpired(self):
conn = self.getClientConnection()
self.assertRaises(PrimaryFailure, self.verification.timeoutExpired, conn,)
# nothing happens
self.checkNoPacketSent(conn)
def test_03_connectionClosed(self): def test_03_connectionClosed(self):
conn = self.getClientConnection() conn = self.getClientConnection()
self.assertRaises(PrimaryFailure, self.verification.connectionClosed, conn,) self.assertRaises(PrimaryFailure, self.verification.connectionClosed, conn,)
# nothing happens # nothing happens
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
def test_04_peerBroken(self):
conn = self.getClientConnection()
self.assertRaises(PrimaryFailure, self.verification.peerBroken, conn,)
# nothing happens
self.checkNoPacketSent(conn)
def test_09_answerPartitionTable(self): def test_09_answerPartitionTable(self):
# send a table # send a table
conn = self.getClientConnection() conn = self.getClientConnection()
......
...@@ -57,24 +57,12 @@ class StorageMasterHandlerTests(NeoUnitTestBase): ...@@ -57,24 +57,12 @@ class StorageMasterHandlerTests(NeoUnitTestBase):
address = ("127.0.0.1", self.master_port) address = ("127.0.0.1", self.master_port)
return self.getFakeConnection(uuid=self.master_uuid, address=address) return self.getFakeConnection(uuid=self.master_uuid, address=address)
def test_06_timeoutExpired(self):
# client connection
conn = self.getMasterConnection()
self.assertRaises(PrimaryFailure, self.operation.timeoutExpired, conn)
self.checkNoPacketSent(conn)
def test_07_connectionClosed2(self): def test_07_connectionClosed2(self):
# primary has closed the connection # primary has closed the connection
conn = self.getMasterConnection() conn = self.getMasterConnection()
self.assertRaises(PrimaryFailure, self.operation.connectionClosed, conn) self.assertRaises(PrimaryFailure, self.operation.connectionClosed, conn)
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
def test_08_peerBroken(self):
# client connection
conn = self.getMasterConnection()
self.assertRaises(PrimaryFailure, self.operation.peerBroken, conn)
self.checkNoPacketSent(conn)
def test_14_notifyPartitionChanges1(self): def test_14_notifyPartitionChanges1(self):
# old partition change -> do nothing # old partition change -> do nothing
app = self.app app = self.app
......
...@@ -56,24 +56,12 @@ class StorageVerificationHandlerTests(NeoUnitTestBase): ...@@ -56,24 +56,12 @@ class StorageVerificationHandlerTests(NeoUnitTestBase):
return self.getFakeConnection(address=("127.0.0.1", self.master_port)) return self.getFakeConnection(address=("127.0.0.1", self.master_port))
# Tests # Tests
def test_02_timeoutExpired(self):
conn = self.getClientConnection()
self.assertRaises(PrimaryFailure, self.verification.timeoutExpired, conn,)
# nothing happens
self.checkNoPacketSent(conn)
def test_03_connectionClosed(self): def test_03_connectionClosed(self):
conn = self.getClientConnection() conn = self.getClientConnection()
self.assertRaises(PrimaryFailure, self.verification.connectionClosed, conn,) self.assertRaises(PrimaryFailure, self.verification.connectionClosed, conn,)
# nothing happens # nothing happens
self.checkNoPacketSent(conn) self.checkNoPacketSent(conn)
def test_04_peerBroken(self):
conn = self.getClientConnection()
self.assertRaises(PrimaryFailure, self.verification.peerBroken, conn,)
# nothing happens
self.checkNoPacketSent(conn)
def test_07_askLastIDs(self): def test_07_askLastIDs(self):
conn = self.getClientConnection() conn = self.getClientConnection()
last_ptid = self.getPTID(1) last_ptid = self.getPTID(1)
......
...@@ -599,7 +599,7 @@ class ConnectionTests(NeoUnitTestBase): ...@@ -599,7 +599,7 @@ class ConnectionTests(NeoUnitTestBase):
# test send was called # test send was called
self._checkSend(1, "testdata") self._checkSend(1, "testdata")
self._checkWriteBuf(bc, '') self._checkWriteBuf(bc, '')
self._checkConnectionClosed(0) self._checkConnectionClosed(1)
self._checkUnregistered(1) self._checkUnregistered(1)
# nothing else pending, and aborted is false, so writer has been removed # nothing else pending, and aborted is false, so writer has been removed
self.assertFalse(bc.pending()) self.assertFalse(bc.pending())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment