Commit d925f14d authored by Grégory Wisniewski's avatar Grégory Wisniewski

Split master election handler in two:

- One the incoming (server) connections 
- Another for initiated (client) connections
Remove checks and decorators no more usefull.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@724 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent b0fe5e6d
...@@ -34,7 +34,7 @@ from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, ...@@ -34,7 +34,7 @@ from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure,
OperationFailure OperationFailure
from neo.master.identification import IdentificationEventHandler from neo.master.identification import IdentificationEventHandler
from neo.master.administration import AdministrationEventHandler from neo.master.administration import AdministrationEventHandler
from neo.master.election import ElectionEventHandler from neo.master.election import ClientElectionEventHandler, ServerElectionEventHandler
from neo.master.recovery import RecoveryEventHandler from neo.master.recovery import RecoveryEventHandler
from neo.master.verification import VerificationEventHandler from neo.master.verification import VerificationEventHandler
from neo.master.service import ClientServiceEventHandler, StorageServiceEventHandler from neo.master.service import ClientServiceEventHandler, StorageServiceEventHandler
...@@ -116,7 +116,7 @@ class Application(object): ...@@ -116,7 +116,7 @@ class Application(object):
except (ElectionFailure, PrimaryFailure): except (ElectionFailure, PrimaryFailure):
# Forget all connections. # Forget all connections.
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
if not isinstance(conn, ListeningConnection): if not conn.isListeningConnection():
conn.close() conn.close()
# Reelect a new primary master. # Reelect a new primary master.
self.electPrimary(bootstrap = False) self.electPrimary(bootstrap = False)
...@@ -132,14 +132,11 @@ class Application(object): ...@@ -132,14 +132,11 @@ class Application(object):
self.unconnected_master_node_set = set() self.unconnected_master_node_set = set()
self.negotiating_master_node_set = set() self.negotiating_master_node_set = set()
handler = ElectionEventHandler(self) self.listening_conn.setHandler(ServerElectionEventHandler(self))
client_handler = ClientElectionEventHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
# Make sure that every connection has the election event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
while 1: while 1:
t = 0 t = 0
self.primary = None self.primary = None
...@@ -178,7 +175,7 @@ class Application(object): ...@@ -178,7 +175,7 @@ class Application(object):
# Try to connect to master nodes. # Try to connect to master nodes.
if self.unconnected_master_node_set: if self.unconnected_master_node_set:
for addr in list(self.unconnected_master_node_set): for addr in list(self.unconnected_master_node_set):
ClientConnection(em, handler, addr = addr, ClientConnection(em, client_handler, addr = addr,
connector_handler = self.connector_handler) connector_handler = self.connector_handler)
if len(self.unconnected_master_node_set) == 0 \ if len(self.unconnected_master_node_set) == 0 \
and len(self.negotiating_master_node_set) == 0: and len(self.negotiating_master_node_set) == 0:
...@@ -270,7 +267,7 @@ class Application(object): ...@@ -270,7 +267,7 @@ class Application(object):
# Close all connections. # Close all connections.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
if not isinstance(conn, ListeningConnection): if not conn.isListeningConnection():
conn.close() conn.close()
bootstrap = False bootstrap = False
......
...@@ -31,6 +31,57 @@ from neo import decorators ...@@ -31,6 +31,57 @@ from neo import decorators
class ElectionEventHandler(MasterEventHandler): class ElectionEventHandler(MasterEventHandler):
"""This class deals with events for a primary master election.""" """This class deals with events for a primary master election."""
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
if uuid is None:
raise protocol.UnexpectedPacketError
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type != MASTER_NODE_TYPE:
# No interest.
continue
# Register new master nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr)
app.nm.add(node)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if node.getUUID() is None:
node.setUUID(uuid)
if state in (node.getState(), RUNNING_STATE):
# No change. Don't care.
continue
if state == RUNNING_STATE:
# No problem.
continue
# Something wrong happened possibly. Cut the connection to
# this node, if any, and notify the information to others.
# XXX this can be very slow.
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
c.close()
node.setState(state)
class ClientElectionEventHandler(MasterEventHandler):
def packetReceived(self, conn, packet):
node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet)
def connectionStarted(self, conn): def connectionStarted(self, conn):
app = self.app app = self.app
addr = conn.getAddress() addr = conn.getAddress()
...@@ -42,6 +93,14 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -42,6 +93,14 @@ class ElectionEventHandler(MasterEventHandler):
conn.ask(protocol.askPrimaryMaster()) conn.ask(protocol.askPrimaryMaster())
MasterEventHandler.connectionCompleted(self, conn) MasterEventHandler.connectionCompleted(self, conn)
def connectionClosed(self, conn):
self.connectionFailed(conn)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
self.connectionFailed(conn)
MasterEventHandler.timeoutExpired(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
app = self.app app = self.app
addr = conn.getAddress() addr = conn.getAddress()
...@@ -54,37 +113,15 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -54,37 +113,15 @@ class ElectionEventHandler(MasterEventHandler):
app.unconnected_master_node_set.add(addr) app.unconnected_master_node_set.add(addr)
MasterEventHandler.connectionFailed(self, conn) MasterEventHandler.connectionFailed(self, conn)
def connectionClosed(self, conn):
if not conn.isServerConnection():
self.connectionFailed(conn)
MasterEventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
if not conn.isServerConnection():
self.connectionFailed(conn)
MasterEventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
app = self.app app = self.app
addr = conn.getAddress() addr = conn.getAddress()
node = app.nm.getNodeByServer(addr) node = app.nm.getNodeByServer(addr)
if not conn.isServerConnection(): if node is not None:
if node is not None: node.setState(DOWN_STATE)
node.setState(DOWN_STATE) app.negotiating_master_node_set.discard(addr)
app.negotiating_master_node_set.discard(addr)
else:
if node is not None and node.getUUID() is not None:
node.setState(BROKEN_STATE)
MasterEventHandler.peerBroken(self, conn) MasterEventHandler.peerBroken(self, conn)
def packetReceived(self, conn, packet):
if not conn.isServerConnection():
node = self.app.nm.getNodeByServer(conn.getAddress())
if node.getState() != BROKEN_STATE:
node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet)
@decorators.client_connection_required
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, num_partitions, uuid, ip_address, port, num_partitions,
num_replicas, your_uuid): num_replicas, your_uuid):
...@@ -121,7 +158,6 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -121,7 +158,6 @@ class ElectionEventHandler(MasterEventHandler):
app.negotiating_master_node_set.discard(conn.getAddress()) app.negotiating_master_node_set.discard(conn.getAddress())
@decorators.client_connection_required
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list): def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
app = self.app app = self.app
# Register new master nodes. # Register new master nodes.
...@@ -165,7 +201,20 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -165,7 +201,20 @@ class ElectionEventHandler(MasterEventHandler):
conn.ask(protocol.requestNodeIdentification(MASTER_NODE_TYPE, conn.ask(protocol.requestNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name)) app.uuid, app.server[0], app.server[1], app.name))
@decorators.server_connection_required
class ServerElectionEventHandler(MasterEventHandler):
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
def peerBroken(self, conn):
app = self.app
addr = conn.getAddress()
node = app.nm.getNodeByServer(addr)
if node is not None and node.getUUID() is not None:
node.setState(BROKEN_STATE)
MasterEventHandler.peerBroken(self, conn)
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name): uuid, ip_address, port, name):
self.checkClusterName(name) self.checkClusterName(name)
...@@ -197,62 +246,16 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -197,62 +246,16 @@ class ElectionEventHandler(MasterEventHandler):
app.pt.getReplicas(), uuid) app.pt.getReplicas(), uuid)
conn.answer(p, packet) conn.answer(p, packet)
@decorators.identification_required
@decorators.server_connection_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
if uuid is None:
raise protocol.UnexpectedPacketError
app = self.app app = self.app
if app.primary: if app.primary:
# I am also the primary... So restart the election. # I am also the primary... So restart the election.
raise ElectionFailure, 'another primary arises' raise ElectionFailure, 'another primary arises'
node = app.nm.getNodeByUUID(uuid) node = app.nm.getNodeByUUID(uuid)
app.primary = False app.primary = False
app.primary_master_node = node app.primary_master_node = node
logging.info('%s is the primary', node) logging.info('%s is the primary', node)
def handleReelectPrimaryMaster(self, conn, packet):
raise ElectionFailure, 'reelection requested'
@decorators.identification_required
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
app = self.app
for node_type, ip_address, port, uuid, state in node_list:
if node_type != MASTER_NODE_TYPE:
# No interest.
continue
# Register new master nodes.
addr = (ip_address, port)
if app.server == addr:
# This is self.
continue
else:
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr)
app.nm.add(node)
app.unconnected_master_node_set.add(addr)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if node.getUUID() is None:
node.setUUID(uuid)
if node.getState() == state:
# No change. Don't care.
continue
if state == RUNNING_STATE:
# No problem.
continue
# Something wrong happened possibly. Cut the connection to
# this node, if any, and notify the information to others.
# XXX this can be very slow.
for c in app.em.getConnectionList():
if c.getUUID() == uuid:
c.close()
node.setState(state)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment