Commit f627727f authored by Vincent Pelletier's avatar Vincent Pelletier

Reorder connection initialisation packets for master connection:

- first, asks which master is the primary
- then, when connected to the primary, request node identification
- then, ask explicitely for partition table and node information, to follow query/answer schema.
Details:
- Master:
 - Move handleAskPrimaryMaster, handleAskNodeInformation and handleAskPartitionTable implementations to base handler to factorise code
 - handleRequestNodeIdentification is not required any more in secondary mode, only handleAskPrimaryMaster
- Client:
 - Make bootstrap handler and master connection behave more similarly to on storage's equivalents
 - Remove PrimaryHandler (moved into PrimaryNotificationsHandler)
 - parse master node configured ip list just once, and register them to node manager just once (in app's __init__)


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@681 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 3a48be31
...@@ -162,9 +162,8 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -162,9 +162,8 @@ class MonitoringEventHandler(BaseEventHandler):
# Should not happen. # Should not happen.
raise RuntimeError('connection completed while not trying to connect') raise RuntimeError('connection completed while not trying to connect')
p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE, # Ask a primary master.
app.uuid, app.server[0], app.server[1], app.name) conn.ask(protocol.askPrimaryMaster())
conn.ask(p)
EventHandler.connectionCompleted(self, conn) EventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
...@@ -275,8 +274,8 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -275,8 +274,8 @@ class MonitoringEventHandler(BaseEventHandler):
# got an uuid from the primary master # got an uuid from the primary master
app.uuid = your_uuid app.uuid = your_uuid
# Ask a primary master. conn.ask(protocol.askNodeInformation())
conn.ask(protocol.askPrimaryMaster()) conn.ask(protocol.askPartitionTable([]))
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
...@@ -322,6 +321,9 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -322,6 +321,9 @@ class MonitoringEventHandler(BaseEventHandler):
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
p = protocol.requestNodeIdentification(ADMIN_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name)
conn.ask(p)
@decorators.identification_required @decorators.identification_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list): def handleSendPartitionTable(self, conn, packet, ptid, row_list):
...@@ -349,6 +351,10 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -349,6 +351,10 @@ class MonitoringEventHandler(BaseEventHandler):
pt.log() pt.log()
@decorators.identification_required
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
logging.warning("handleAnswerPartitionTable")
@decorators.identification_required @decorators.identification_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
logging.warning("handleNotifyPartitionChanges") logging.warning("handleNotifyPartitionChanges")
...@@ -442,6 +448,10 @@ class MonitoringEventHandler(BaseEventHandler): ...@@ -442,6 +448,10 @@ class MonitoringEventHandler(BaseEventHandler):
self.app.notified = True self.app.notified = True
@decorators.identification_required
def handleAnswerNodeInformation(self, conn, packet, node_list):
logging.info("handleAnswerNodeInformation")
def handleAnswerClusterState(self, conn, packet, state): def handleAnswerClusterState(self, conn, packet, state):
self.app.cluster_state = state self.app.cluster_state = state
......
...@@ -231,7 +231,21 @@ class Application(object): ...@@ -231,7 +231,21 @@ class Application(object):
self.pt = None self.pt = None
self.master_conn = None self.master_conn = None
self.primary_master_node = None self.primary_master_node = None
self.master_node_list = master_nodes.split(' ') self.trying_master_node = None
# XXX: this code duplicates neo.config.ConfigurationManager.getMasterNodeList
self.master_node_list = master_node_list = []
for node in master_nodes.split():
if not node:
continue
if ':' in node:
ip_address, port = node.split(':')
port = int(port)
else:
ip_address = node
port = 10100 # XXX: default_master_port
server = (ip_address, port)
master_node_list.append(server)
self.nm.add(MasterNode(server=server))
# no self-assigned UUID, primary master will supply us one # no self-assigned UUID, primary master will supply us one
self.uuid = INVALID_UUID self.uuid = INVALID_UUID
self.mq_cache = MQ() self.mq_cache = MQ()
...@@ -239,6 +253,7 @@ class Application(object): ...@@ -239,6 +253,7 @@ class Application(object):
self.ptid = INVALID_PTID self.ptid = INVALID_PTID
self.storage_handler = StorageAnswersHandler(self, self.dispatcher) self.storage_handler = StorageAnswersHandler(self, self.dispatcher)
self.primary_handler = PrimaryAnswersHandler(self, self.dispatcher) self.primary_handler = PrimaryAnswersHandler(self, self.dispatcher)
self.primary_bootstrap_handler = PrimaryBootstrapHandler(self, self.dispatcher)
self.notifications_handler = PrimaryNotificationsHandler(self, self.dispatcher) self.notifications_handler = PrimaryNotificationsHandler(self, self.dispatcher)
# Internal attribute distinct between thread # Internal attribute distinct between thread
self.local_var = ThreadContext() self.local_var = ThreadContext()
...@@ -352,70 +367,72 @@ class Application(object): ...@@ -352,70 +367,72 @@ class Application(object):
def _connectToPrimaryMasterNode(self): def _connectToPrimaryMasterNode(self):
logging.debug('connecting to primary master...') logging.debug('connecting to primary master...')
master_index = 0 ready = False
# Make application execute remaining message if any nm = self.nm
self._waitMessage() while not ready:
while True: # Get network connection to primary master
self.setNodeReady() index = 0
if self.primary_master_node in (None, -1): connected = False
# Try with master node defined in config while not connected:
if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node
else:
# Otherwise, check one by one.
master_list = nm.getMasterNodeList()
try:
self.trying_master_node = master_list[index]
except IndexError:
index = 0
self.trying_master_node = master_list[0]
index += 1
# Connect to master
conn = MTClientConnection(self.em, self.notifications_handler,
addr=self.trying_master_node.getServer(),
connector_handler=self.connector_handler)
# Query for primary master node
conn.lock()
try: try:
addr, port = self.master_node_list[master_index].split(':') msg_id = conn.ask(protocol.askPrimaryMaster())
except IndexError: self.dispatcher.register(conn, msg_id, self.local_var.queue)
master_index = 0 finally:
addr, port = self.master_node_list[master_index].split(':') conn.unlock()
port = int(port) self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
else: # If we reached the primary master node, mark as connected
addr, port = self.primary_master_node.getServer() connected = self.primary_master_node is not None \
# Request Node Identification and self.primary_master_node is self.trying_master_node
handler = PrimaryBootstrapHandler(self, self.dispatcher)
conn = MTClientConnection(self.em, handler, (addr, port), # Identify to primary master and request initial data
connector_handler=self.connector_handler)
self._nm_acquire()
try:
if self.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
self.nm.add(n)
finally:
self._nm_release()
conn.lock() conn.lock()
try: try:
p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE, p = protocol.requestNodeIdentification(CLIENT_NODE_TYPE,
self.uuid, '0.0.0.0', 0, self.name) self.uuid, '0.0.0.0', 0, self.name)
msg_id = conn.ask(p) msg_id = conn.ask(p)
self.dispatcher.register(conn, msg_id, self.local_var.queue) self.dispatcher.register(conn, msg_id, self.local_var.queue)
finally: finally:
conn.unlock() conn.unlock()
self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
# Wait for answer if self.uuid != INVALID_UUID:
while 1: # TODO: pipeline those 2 requests
self._waitMessage(handler=handler) # This is currently impossible because _waitMessage can only
# Now check result # wait on one message at a time
if self.primary_master_node is not None: conn.lock()
if self.primary_master_node == -1: try:
# Connection failed, try with another master node msg_id = conn.ask(protocol.askPartitionTable([]))
self.primary_master_node = None self.dispatcher.register(conn, msg_id, self.local_var.queue)
master_index += 1 finally:
break conn.unlock()
elif self.primary_master_node.getServer() != (addr, port): self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
# Master node changed, connect to new one conn.lock()
break try:
elif not self.isNodeReady(): msg_id = conn.ask(protocol.askNodeInformation())
# Wait a bit and reask again self.dispatcher.register(conn, msg_id, self.local_var.queue)
break finally:
elif self.pt is not None and self.pt.operational(): conn.unlock()
# Connected to primary master node self._waitMessage(conn, msg_id, handler=self.primary_bootstrap_handler)
break ready = self.uuid != INVALID_UUID and self.pt is not None \
sleep(0.1) and self.pt.operational()
if self.pt is not None and self.pt.operational() \
and self.uuid != INVALID_UUID:
# Connected to primary master node and got all informations
break
sleep(1)
logging.info("connected to primary master node %s" % self.primary_master_node) logging.info("connected to primary master node %s" % self.primary_master_node)
conn.setHandler(PrimaryNotificationsHandler(self, self.dispatcher))
return conn return conn
def registerDB(self, db, limit): def registerDB(self, db, limit):
......
...@@ -61,100 +61,63 @@ class BaseHandler(EventHandler): ...@@ -61,100 +61,63 @@ class BaseHandler(EventHandler):
else: else:
queue.put((conn, packet)) queue.put((conn, packet))
class PrimaryHandler(BaseHandler):
def handleNotifyNodeInformation(self, conn, packet, node_list): class PrimaryBootstrapHandler(BaseHandler):
app = self.app """ Bootstrap handler used when looking for the primary master """
nm = app.nm
for node_type, ip_address, port, uuid, state in node_list:
logging.info("notified of %s %s %d %s %s" %(node_type, ip_address, port, dump(uuid), state))
# Register new nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
# node only exists by address, remove it
nm.remove(n)
n = None
elif n.getServer() != addr:
# same uuid but different address, remove it
nm.remove(n)
n = None
if node_type == MASTER_NODE_TYPE:
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
elif node_type == CLIENT_NODE_TYPE:
continue
n.setState(state)
# close connection to this node if no longer running
if node_type in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE) and \
state != RUNNING_STATE:
for conn in self.app.em.getConnectionList():
if conn.getUUID() == n.getUUID():
conn.close()
break
if node_type == STORAGE_NODE_TYPE:
# Remove from pool connection
app.cp.removeConnection(n)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set:
queue.put((conn, None))
def connectionCompleted(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection completed while not trying to connect')
super(PrimaryBootstrapHandler, self).connectionCompleted(conn)
def connectionFailed(self, conn):
app = self.app
if app.trying_master_node is None:
# Should not happen.
raise RuntimeError('connection failed while not trying to connect')
if app.trying_master_node is app.primary_master_node:
# Tried to connect to a primary master node and failed.
# So this would effectively mean that it is dead.
app.primary_master_node = None
class PrimaryBootstrapHandler(PrimaryHandler): app.trying_master_node = None
""" Bootstrap handler used when looking for the primary master """
def connectionFailed(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBootstrapHandler, self).connectionFailed(conn) super(PrimaryBootstrapHandler, self).connectionFailed(conn)
def timeoutExpired(self, conn):
app = self.app
if app.trying_master_node is app.primary_master_node:
# If a primary master node timeouts, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).timeoutExpired(conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
if self.app.primary_master_node is None: app = self.app
self.app.primary_master_node = -1 if app.trying_master_node is app.primary_master_node:
# If a primary master node closes, I should not rely on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).connectionClosed(conn) super(PrimaryBootstrapHandler, self).connectionClosed(conn)
def peerBroken(self, conn): def peerBroken(self, conn):
if self.app.primary_master_node is None: app = self.app
self.app.primary_master_node = -1 if app.trying_master_node is app.primary_master_node:
# If a primary master node gets broken, I should not rely
# on it.
app.primary_master_node = None
app.trying_master_node = None
super(PrimaryBootstrapHandler, self).peerBroken(conn) super(PrimaryBootstrapHandler, self).peerBroken(conn)
def timeoutExpired(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBootstrapHandler, self).timeoutExpired(conn)
def handleNotReady(self, conn, packet, message): def handleNotReady(self, conn, packet, message):
self.app.setNodeNotReady() app = self.app
if app.trying_master_node is not None:
app.trying_master_node = None
conn.close()
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas, your_uuid): num_partitions, num_replicas, your_uuid):
...@@ -192,16 +155,9 @@ class PrimaryBootstrapHandler(PrimaryHandler): ...@@ -192,16 +155,9 @@ class PrimaryBootstrapHandler(PrimaryHandler):
finally: finally:
conn.unlock() conn.unlock()
@decorators.identification_required def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list): known_master_list):
uuid = conn.getUUID()
app = self.app app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by master node
if node.getNodeType() != MASTER_NODE_TYPE:
conn.close()
return
# Register new master nodes. # Register new master nodes.
for ip_address, port, uuid in known_master_list: for ip_address, port, uuid in known_master_list:
addr = (ip_address, port) addr = (ip_address, port)
...@@ -216,59 +172,37 @@ class PrimaryBootstrapHandler(PrimaryHandler): ...@@ -216,59 +172,37 @@ class PrimaryBootstrapHandler(PrimaryHandler):
n.setUUID(uuid) n.setUUID(uuid)
if primary_uuid != INVALID_UUID: if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid) primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None: if primary_node is None:
# I don't know such a node. Probably this information # I don't know such a node. Probably this information
# is old. So ignore it. # is old. So ignore it.
pass logging.warning('Unknown primary master UUID: %s. ' \
'Ignoring.' % dump(primary_uuid))
else: else:
if primary_node.getUUID() == primary_uuid: app.primary_master_node = primary_node
# Whatever the situation is, I trust this master. if app.trying_master_node is primary_node:
app.primary_master_node = primary_node # I am connected to the right one.
logging.info('connected to a primary master node')
@decorators.identification_required else:
def handleSendPartitionTable(self, conn, packet, ptid, row_list): app.trying_master_node = None
# This handler is in PrimaryBootstrapHandler, since this conn.close()
# basicaly is an answer to askPrimaryMaster. else:
# Extract from P-NEO-Protocol.Description: if app.primary_master_node is not None:
# Connection to primary master node (PMN in service state) # The primary master node is not a primary master node
# CN -> PMN : askPrimaryMaster # any longer.
# PMN -> CN : answerPrimaryMaster containing primary uuid and no app.primary_master_node = None
# known master list
# PMN -> CN : notifyNodeInformation containing list of all app.trying_master_node = None
# ASK_STORE_TRANSACTION# PMN -> CN : sendPartitionTable containing partition table id and conn.close()
# list of rows
# notifyNodeInformation is valid as asynchrounous event, but def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
# sendPartitionTable is only triggered after askPrimaryMaster. logging.info("handleAnswerPartitionTable")
uuid = conn.getUUID()
app = self.app def handleAnswerNodeInformation(self, conn, packet, node_list):
nm = app.nm logging.info("handleAnswerNodeInformation")
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
class PrimaryNotificationsHandler(PrimaryHandler): class PrimaryNotificationsHandler(BaseHandler):
""" Handler that process the notifications from the primary master """ """ Handler that process the notifications from the primary master """
def connectionClosed(self, conn): def connectionClosed(self, conn):
...@@ -328,6 +262,106 @@ class PrimaryNotificationsHandler(PrimaryHandler): ...@@ -328,6 +262,106 @@ class PrimaryNotificationsHandler(PrimaryHandler):
nm.add(node) nm.add(node)
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
@decorators.identification_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
# This handler is in PrimaryBootstrapHandler, since this
# basicaly is an answer to askPrimaryMaster.
# Extract from P-NEO-Protocol.Description:
# Connection to primary master node (PMN in service state)
# CN -> PMN : askPrimaryMaster
# PMN -> CN : answerPrimaryMaster containing primary uuid and no
# known master list
# PMN -> CN : notifyNodeInformation containing list of all
# ASK_STORE_TRANSACTION# PMN -> CN : sendPartitionTable containing partition table id and
# list of rows
# notifyNodeInformation is valid as asynchrounous event, but
# sendPartitionTable is only triggered after askPrimaryMaster.
uuid = conn.getUUID()
app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
nm = app.nm
for node_type, ip_address, port, uuid, state in node_list:
logging.info("notified of %s %s %d %s %s" %(node_type, ip_address, port, dump(uuid), state))
# Register new nodes.
addr = (ip_address, port)
# Try to retrieve it from nm
n = None
if uuid != INVALID_UUID:
n = nm.getNodeByUUID(uuid)
if n is None:
n = nm.getNodeByServer(addr)
if n is not None and uuid != INVALID_UUID:
# node only exists by address, remove it
nm.remove(n)
n = None
elif n.getServer() != addr:
# same uuid but different address, remove it
nm.remove(n)
n = None
if node_type == MASTER_NODE_TYPE:
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
elif node_type == CLIENT_NODE_TYPE:
continue
n.setState(state)
# close connection to this node if no longer running
if node_type in (MASTER_NODE_TYPE, STORAGE_NODE_TYPE) and \
state != RUNNING_STATE:
for conn in self.app.em.getConnectionList():
if conn.getUUID() == n.getUUID():
conn.close()
break
if node_type == STORAGE_NODE_TYPE:
# Remove from pool connection
app.cp.removeConnection(n)
# Put fake packets to task queues.
queue_set = set()
for key in self.dispatcher.message_table.keys():
if id(conn) == key[0]:
queue = self.dispatcher.message_table.pop(key)
queue_set.add(queue)
# Storage failure is notified to the primary master when the fake
# packet if popped by a non-polling thread.
for queue in queue_set:
queue.put((conn, None))
class PrimaryAnswersHandler(BaseHandler): class PrimaryAnswersHandler(BaseHandler):
""" Handle that process expected packets from the primary master """ """ Handle that process expected packets from the primary master """
......
...@@ -39,11 +39,7 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -39,11 +39,7 @@ class ElectionEventHandler(MasterEventHandler):
MasterEventHandler.connectionStarted(self, conn) MasterEventHandler.connectionStarted(self, conn)
def connectionCompleted(self, conn): def connectionCompleted(self, conn):
app = self.app conn.ask(protocol.askPrimaryMaster())
# Request a node idenfitication.
p = protocol.requestNodeIdentification(MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
conn.ask(p)
MasterEventHandler.connectionCompleted(self, conn) MasterEventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
...@@ -119,8 +115,11 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -119,8 +115,11 @@ class ElectionEventHandler(MasterEventHandler):
conn.setUUID(uuid) conn.setUUID(uuid)
node.setUUID(uuid) node.setUUID(uuid)
# Ask a primary master. if app.uuid < uuid:
conn.ask(protocol.askPrimaryMaster()) # I lost.
app.primary = False
app.negotiating_master_node_set.discard(conn.getAddress())
@decorators.client_connection_required @decorators.client_connection_required
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list): def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
...@@ -161,12 +160,10 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -161,12 +160,10 @@ class ElectionEventHandler(MasterEventHandler):
# Whatever the situation is, I trust this master. # Whatever the situation is, I trust this master.
app.primary = False app.primary = False
app.primary_master_node = primary_node app.primary_master_node = primary_node
else:
if app.uuid < conn.getUUID():
# I lost.
app.primary = False
app.negotiating_master_node_set.discard(conn.getAddress()) # Request a node idenfitication.
conn.ask(protocol.requestNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.name))
@decorators.server_connection_required @decorators.server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type, def handleRequestNodeIdentification(self, conn, packet, node_type,
...@@ -198,28 +195,6 @@ class ElectionEventHandler(MasterEventHandler): ...@@ -198,28 +195,6 @@ class ElectionEventHandler(MasterEventHandler):
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, app.uuid, p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.pt.getPartitions(), app.server[0], app.server[1], app.pt.getPartitions(),
app.pt.getReplicas(), uuid) app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
@decorators.identification_required
@decorators.server_connection_required
def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
app = self.app
if app.primary:
primary_uuid = app.uuid
elif app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID()
else:
primary_uuid = INVALID_UUID
known_master_list = []
for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE:
continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.answer(p, packet) conn.answer(p, packet)
@decorators.identification_required @decorators.identification_required
......
...@@ -18,6 +18,8 @@ ...@@ -18,6 +18,8 @@
import logging import logging
from neo.handler import EventHandler from neo.handler import EventHandler
from neo.protocol import INVALID_UUID, BROKEN_STATE, ADMIN_NODE_TYPE
from neo import protocol
class MasterEventHandler(EventHandler): class MasterEventHandler(EventHandler):
"""This class implements a generic part of the event handlers.""" """This class implements a generic part of the event handlers."""
...@@ -29,9 +31,6 @@ class MasterEventHandler(EventHandler): ...@@ -29,9 +31,6 @@ class MasterEventHandler(EventHandler):
uuid, ip_address, port, name): uuid, ip_address, port, name):
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
def handleAskPrimaryMaster(self, conn, packet):
raise NotImplementedError('this method must be overridden')
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
raise NotImplementedError('this method must be overridden') raise NotImplementedError('this method must be overridden')
...@@ -85,3 +84,32 @@ class MasterEventHandler(EventHandler): ...@@ -85,3 +84,32 @@ class MasterEventHandler(EventHandler):
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
logging.error('ignoring notify partition changes in %s' % self.__class__.__name__) logging.error('ignoring notify partition changes in %s' % self.__class__.__name__)
def handleAskPrimaryMaster(self, conn, packet):
app = self.app
if app.primary:
primary_uuid = app.uuid
elif app.primary_master_node is not None:
primary_uuid = app.primary_master_node.getUUID()
else:
primary_uuid = INVALID_UUID
known_master_list = [app.server + (app.uuid, )]
for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE:
continue
known_master_list.append(n.getServer() + \
(n.getUUID() or INVALID_UUID, ))
conn.answer(protocol.answerPrimaryMaster(primary_uuid,
known_master_list), packet)
def handleAskNodeInformation(self, conn, packet):
self.app.sendNodesInformations(conn)
conn.answer(protocol.answerNodeInformation([]), packet)
def handleAskPartitionTable(self, conn, packet, offset_list):
assert len(offset_list) == 0
app = self.app
app.sendPartitionTable(conn)
conn.answer(protocol.answerPartitionTable(app.pt.getID(), []), packet)
...@@ -163,32 +163,11 @@ class RecoveryEventHandler(MasterEventHandler): ...@@ -163,32 +163,11 @@ class RecoveryEventHandler(MasterEventHandler):
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE, p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1], app.uuid, app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(), uuid) app.pt.getPartitions(), app.pt.getReplicas(), uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet) conn.answer(p, packet)
@decorators.identification_required if node_type is STORAGE_NODE_TYPE:
def handleAskPrimaryMaster(self, conn, packet): # ask the last IDs.
uuid = conn.getUUID()
app = self.app
# Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because
# I must send all node information immediately.
p = protocol.answerPrimaryMaster(app.uuid, [])
conn.answer(p, packet)
# Send the information.
app.sendNodesInformations(conn)
# If this is a storage node, ask the last IDs.
node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() == STORAGE_NODE_TYPE:
conn.ask(protocol.askLastIDs()) conn.ask(protocol.askLastIDs())
elif node.getNodeType() == ADMIN_NODE_TYPE and app.pt.getID() != INVALID_PTID:
# send partition table if exists
logging.info('sending partition table %s to %s' %
(dump(app.pt.getID()), conn.getAddress()))
app.sendPartitionTable(conn)
@decorators.identification_required @decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
......
...@@ -56,48 +56,6 @@ class SecondaryEventHandler(MasterEventHandler): ...@@ -56,48 +56,6 @@ class SecondaryEventHandler(MasterEventHandler):
node.setState(RUNNING_STATE) node.setState(RUNNING_STATE)
MasterEventHandler.packetReceived(self, conn, packet) MasterEventHandler.packetReceived(self, conn, packet)
@decorators.server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
self.checkClusterName(name)
app = self.app
# Add a node only if it is a master node and I do not know it yet.
if node_type == MASTER_NODE_TYPE and uuid != INVALID_UUID:
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(MASTER_NODE_TYPE,
app.uuid, app.server[0], app.server[1],
app.pt.getPartitions(), app.pt.getReplicas(),
uuid)
# Next, the peer should ask a primary master node.
conn.answer(p, packet)
@decorators.identification_required
@decorators.server_connection_required
def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
app = self.app
primary_uuid = app.primary_master_node.getUUID()
known_master_list = []
for n in app.nm.getMasterNodeList():
if n.getState() == BROKEN_STATE:
continue
info = n.getServer() + (n.getUUID() or INVALID_UUID,)
known_master_list.append(info)
p = protocol.answerPrimaryMaster(primary_uuid, known_master_list)
conn.answer(p, packet)
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
raise UnexpectedPacketError raise UnexpectedPacketError
......
...@@ -271,30 +271,6 @@ class ServiceEventHandler(MasterEventHandler): ...@@ -271,30 +271,6 @@ class ServiceEventHandler(MasterEventHandler):
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.answer(p, packet) conn.answer(p, packet)
@decorators.identification_required
def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
app = self.app
# Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because
# I must send all node information immediately.
conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet)
# Send the information.
logging.info('sending notify node information to %s:%d', *(conn.getAddress()))
app.sendNodesInformations(conn)
# If this is a storage node or a client node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, ADMIN_NODE_TYPE):
logging.info('sending partition table to %s:%d', *(conn.getAddress()))
app.sendPartitionTable(conn)
# If this is a non-pending storage node, ask it to start.
if node.getNodeType() == STORAGE_NODE_TYPE and node.getState() != PENDING_STATE:
conn.notify(protocol.startOperation())
@decorators.identification_required @decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
# I am also the primary... So restart the election. # I am also the primary... So restart the election.
......
...@@ -189,24 +189,6 @@ class VerificationEventHandler(MasterEventHandler): ...@@ -189,24 +189,6 @@ class VerificationEventHandler(MasterEventHandler):
# Next, the peer should ask a primary master node. # Next, the peer should ask a primary master node.
conn.answer(p, packet) conn.answer(p, packet)
@decorators.identification_required
def handleAskPrimaryMaster(self, conn, packet):
uuid = conn.getUUID()
app = self.app
# Merely tell the peer that I am the primary master node.
# It is not necessary to send known master nodes, because
# I must send all node information immediately.
conn.answer(protocol.answerPrimaryMaster(app.uuid, []), packet)
# Send the information.
app.sendNodesInformations(conn)
# If this is a storage node or an admin node, send the partition table.
node = app.nm.getNodeByUUID(uuid)
if node.getNodeType() in (STORAGE_NODE_TYPE, ADMIN_NODE_TYPE):
app.sendPartitionTable(conn)
@decorators.identification_required @decorators.identification_required
def handleAnnouncePrimaryMaster(self, conn, packet): def handleAnnouncePrimaryMaster(self, conn, packet):
uuid = conn.getUUID() uuid = conn.getUUID()
......
...@@ -192,7 +192,27 @@ class Application(object): ...@@ -192,7 +192,27 @@ class Application(object):
t = 0 t = 0
while 1: while 1:
em.poll(1) em.poll(1)
if self.primary_master_node is not None: if self.trying_master_node is None:
if t + 1 < time():
# Choose a master node to connect to.
if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node
else:
# Otherwise, check one by one.
master_list = nm.getMasterNodeList()
try:
self.trying_master_node = master_list[index]
except IndexError:
index = 0
self.trying_master_node = master_list[0]
index += 1
ClientConnection(em, handler, \
addr = self.trying_master_node.getServer(),
connector_handler = self.connector_handler)
t = time()
elif self.primary_master_node is self.trying_master_node:
# If I know which is a primary master node, check if # If I know which is a primary master node, check if
# I have a connection to it already. # I have a connection to it already.
for conn in em.getConnectionList(): for conn in em.getConnectionList():
...@@ -204,26 +224,6 @@ class Application(object): ...@@ -204,26 +224,6 @@ class Application(object):
# Yes, I have. # Yes, I have.
return return
if self.trying_master_node is None and t + 1 < time():
# Choose a master node to connect to.
if self.primary_master_node is not None:
# If I know a primary master node, pinpoint it.
self.trying_master_node = self.primary_master_node
else:
# Otherwise, check one by one.
master_list = nm.getMasterNodeList()
try:
self.trying_master_node = master_list[index]
except IndexError:
index = 0
self.trying_master_node = master_list[0]
index += 1
ClientConnection(em, handler, \
addr = self.trying_master_node.getServer(),
connector_handler = self.connector_handler)
t = time()
def verifyData(self): def verifyData(self):
"""Verify data under the control by a primary master node. """Verify data under the control by a primary master node.
Connections from client nodes may not be accepted at this stage.""" Connections from client nodes may not be accepted at this stage."""
......
...@@ -38,9 +38,7 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -38,9 +38,7 @@ class BootstrapEventHandler(StorageEventHandler):
# Should not happen. # Should not happen.
raise RuntimeError('connection completed while not trying to connect') raise RuntimeError('connection completed while not trying to connect')
p = protocol.requestNodeIdentification(STORAGE_NODE_TYPE, app.uuid, conn.ask(protocol.askPrimaryMaster())
app.server[0], app.server[1], app.name)
conn.ask(p)
StorageEventHandler.connectionCompleted(self, conn) StorageEventHandler.connectionCompleted(self, conn)
def connectionFailed(self, conn): def connectionFailed(self, conn):
...@@ -179,10 +177,13 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -179,10 +177,13 @@ class BootstrapEventHandler(StorageEventHandler):
logging.info('Got a new UUID from master : %s' % dump(app.uuid)) logging.info('Got a new UUID from master : %s' % dump(app.uuid))
conn.setUUID(uuid) conn.setUUID(uuid)
node.setUUID(uuid) #node.setUUID(uuid)
# Node UUID was set in handleAnswerPrimaryMaster
assert node.getUUID() == uuid
# Ask a primary master. # XXX: change handler for next packet (which might be handled in poll before it returns)
conn.ask(protocol.askPrimaryMaster()) # This should be removed when we will handle our own pending packet queue.
conn.setHandler(VerificationEventHandler(app))
@decorators.client_connection_required @decorators.client_connection_required
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
...@@ -213,10 +214,6 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -213,10 +214,6 @@ class BootstrapEventHandler(StorageEventHandler):
if app.trying_master_node is primary_node: if app.trying_master_node is primary_node:
# I am connected to the right one. # I am connected to the right one.
logging.info('connected to a primary master node') logging.info('connected to a primary master node')
# This is a workaround to prevent handling of
# packets for the verification phase.
handler = VerificationEventHandler(app)
conn.setHandler(handler)
else: else:
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
...@@ -228,6 +225,9 @@ class BootstrapEventHandler(StorageEventHandler): ...@@ -228,6 +225,9 @@ class BootstrapEventHandler(StorageEventHandler):
app.trying_master_node = None app.trying_master_node = None
conn.close() conn.close()
p = protocol.requestNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.name)
conn.ask(p)
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
logging.warning('/!\ handleAskLastIDs') logging.warning('/!\ handleAskLastIDs')
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment