Commit f949240a authored by Grégory Wisniewski's avatar Grégory Wisniewski

Use nm.getIdentifiedList() instead of em.getConnectionList()

git-svn-id: https://svn.erp5.org/repos/neo/trunk@1819 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent c91048f9
......@@ -148,9 +148,9 @@ class PrimaryNotificationsHandler(BaseHandler):
for node_type, addr, uuid, state in node_list:
if state != NodeStates.RUNNING:
# close connection to this node if no longer running
conn_list = self.app.em.getConnectionListByUUID(uuid)
if conn_list:
conn = conn_list[0]
node = self.app.nm.getByUUID(uuid)
if node and node.isConnected():
conn = node.getConnection()
conn.close()
if node_type == NodeTypes.STORAGE:
# Remove from pool connection
......
......@@ -264,13 +264,10 @@ class Application(object):
assign_for_notification(NodeTypes.ADMIN)
# send at most one non-empty notification packet per node
for conn in self.em.getConnectionList():
if conn.getUUID() is None:
continue
node = self.nm.getByUUID(conn.getUUID())
for node in self.nm.getIdentifiedList():
node_list = node_dict.get(node.getType(), [])
if node_list:
conn.notify(Packets.NotifyNodeInformation(node_list))
node.notify(Packets.NotifyNodeInformation(node_list))
def broadcastPartitionChanges(self, cell_list):
"""Broadcast a Notify Partition Changes packet."""
......@@ -279,12 +276,9 @@ class Application(object):
return
ptid = self.pt.setNextID()
self.pt.log()
for c in self.em.getConnectionList():
n = self.nm.getByUUID(c.getUUID())
if n is None:
continue
if n.isClient() or n.isStorage() or n.isAdmin():
c.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
for node in self.nm.getIdentifiedList():
if node.isClient() or node.isStorage() or node.isAdmin():
node.notify(Packets.NotifyPartitionChanges(ptid, cell_list))
def outdateAndBroadcastPartition(self):
" Outdate cell of non-working nodes and broadcast changes """
......@@ -319,10 +313,9 @@ class Application(object):
def broadcastLastOID(self, oid):
logging.debug('Broadcast last OID to storages : %s' % dump(oid))
packet = Packets.NotifyLastOID(oid)
for conn in self.em.getConnectionList():
node = self.nm.getByUUID(conn.getUUID())
if node is not None and node.isStorage():
conn.notify(packet)
for node in self.nm.getIdentifiedList():
if node.isStorage():
node.notify(packet)
def provideService(self):
"""
......@@ -345,13 +338,11 @@ class Application(object):
# If not operational, send Stop Operation packets to storage
# nodes and client nodes. Abort connections to client nodes.
logging.critical('No longer operational, stopping the service')
for conn in em.getConnectionList():
node = nm.getByUUID(conn.getUUID())
if node is not None and (node.isStorage()
or node.isClient()):
conn.notify(Packets.StopOperation())
for node in self.nm.getIdentifiedList():
if node.isStorage() or node.isClient():
node.notify(Packets.StopOperation())
if node.isClient():
conn.abort()
node.getConnection().abort()
# Then, go back, and restart.
return
......@@ -457,16 +448,13 @@ class Application(object):
# change handlers
notification_packet = Packets.NotifyClusterInformation(state)
for conn in em.getConnectionList():
node = nm.getByUUID(conn.getUUID())
if conn.isListening() or node is None:
# not identified or listening, keep the identification handler
continue
for node in self.nm.getIdentifiedList():
if not node.isMaster():
conn.notify(notification_packet)
node.notify(notification_packet)
if node.isAdmin() or node.isMaster():
# those node types keep their own handler
continue
conn = node.getConnection()
if node.isClient():
if state != ClusterStates.RUNNING:
conn.close()
......@@ -514,8 +502,8 @@ class Application(object):
# corrected.
# change handler
handler = shutdown.ShutdownHandler(self)
for c in self.em.getConnectionList():
c.setHandler(handler)
for node in self.nm.getIdentifiedList():
node.getConnection().setHandler(handler)
# wait for all transaction to be finished
while True:
......@@ -525,15 +513,12 @@ class Application(object):
if self.cluster_state == ClusterStates.RUNNING:
sys.exit("Application has been asked to shut down")
logging.info("asking all nodes to shutdown")
for c in self.em.getConnectionList():
node = self.nm.getByUUID(c.getUUID())
if node is None:
continue
for node in self.nm.getIdentifiedList():
notification = Packets.NotifyNodeInformation([node.asTuple()])
if node.isClient():
c.notify(notification)
node.notify(notification)
elif node.isStorage() or node.isMaster():
c.notify(notification)
node.notify(notification)
# then shutdown
sys.exit("Cluster has been asked to shut down")
......
......@@ -86,24 +86,18 @@ class AdministrationHandler(MasterHandler):
if state == NodeStates.RUNNING:
# first make sure to have a connection to the node
node_conn = None
for node_conn in app.em.getConnectionList():
if node_conn.getUUID() == node.getUUID():
break
else:
# no connection to the node
if not node.isConnected():
raise ProtocolError('no connection to the node')
node.setState(state)
elif state == NodeStates.DOWN and node.isStorage():
# update it's state
node.setState(state)
for storage_conn in app.em.getConnectionListByUUID(uuid):
if node.isConnected():
# notify itself so it can shutdown
node_list = [node.asTuple()]
storage_conn.notify(Packets.NotifyNodeInformation(node_list))
node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
# close to avoid handle the closure as a connection lost
storage_conn.abort()
node.getConnection().abort()
# modify the partition table if required
cell_list = []
if modify_partition_table:
......@@ -152,10 +146,9 @@ class AdministrationHandler(MasterHandler):
node.setRunning()
app.broadcastNodesInformation(node_list)
# start nodes
for s_conn in em.getConnectionList():
if s_conn.getUUID() in uuid_set:
s_conn.notify(Packets.NotifyLastOID(app.loid))
s_conn.notify(Packets.StartOperation())
for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
node.notify(Packets.NotifyLastOID(app.loid))
node.notify(Packets.StartOperation())
# broadcast the new partition table
app.broadcastPartitionChanges(cell_list)
p = Errors.Ack('node added')
......
......@@ -76,10 +76,9 @@ class ClientServiceHandler(MasterHandler):
# build a new set as we may not send the message to all nodes as some
# might be not reachable at that time
used_uuid_set = set()
for c in app.em.getConnectionList():
if c.getUUID() in uuid_set:
c.ask(Packets.AskLockInformation(tid), timeout=60)
used_uuid_set.add(c.getUUID())
for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
node.ask(Packets.AskLockInformation(tid), timeout=60)
used_uuid_set.add(node.getUUID())
app.tm.prepare(tid, oid_list, used_uuid_set, conn.getPeerId())
......@@ -77,19 +77,16 @@ class StorageServiceHandler(BaseServiceHandler):
# Transaction Finished to the initiated client node,
# Invalidate Objects to the other client nodes, and Unlock
# Information to relevant storage nodes.
for c in app.em.getConnectionList():
uuid = c.getUUID()
if uuid is not None:
node = app.nm.getByUUID(uuid)
for node in self.app.nm.getIdentifiedList():
if node.isClient():
if node is t.getNode():
p = Packets.AnswerTransactionFinished(tid)
c.answer(p, msg_id=t.getMessageId())
node.answer(p, msg_id=t.getMessageId())
else:
c.notify(Packets.InvalidateObjects(t.getOIDList(), tid))
node.notify(Packets.InvalidateObjects(t.getOIDList(), tid))
elif node.isStorage():
if uuid in t.getUUIDList():
c.notify(Packets.NotifyUnlockInformation(tid))
node.notify(Packets.NotifyUnlockInformation(tid))
# remove transaction from manager
tm.remove(tid)
......
......@@ -93,13 +93,10 @@ class VerificationManager(BaseServiceHandler):
logging.info('start to verify data')
# Gather all unfinished transactions.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getByUUID(uuid)
for node in self.app.nm.getIdentifiedList():
if node.isStorage():
self._uuid_dict[uuid] = False
conn.ask(Packets.AskUnfinishedTransactions())
self._uuid_dict[node.getUUID()] = False
node.ask(Packets.AskUnfinishedTransactions())
while True:
em.poll(1)
......@@ -116,40 +113,34 @@ class VerificationManager(BaseServiceHandler):
uuid_set = self.verifyTransaction(tid)
if uuid_set is None:
# Make sure that no node has this transaction.
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid is not None:
node = nm.getByUUID(uuid)
for node in self.app.nm.getIdentifiedList():
if node.isStorage():
conn.notify(Packets.DeleteTransaction(tid))
node.notify(Packets.DeleteTransaction(tid))
else:
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in uuid_set:
conn.ask(Packets.CommitTransaction(tid))
for node in self.app.nm.getIdentifiedList(pool_set=uuid_set):
node.ask(Packets.CommitTransaction(tid))
# If possible, send the packets now.
em.poll(0)
def verifyTransaction(self, tid):
em = self.app.em
nm = self.app.nm
uuid_set = set()
# Determine to which nodes I should ask.
partition = self.app.pt.getPartition(tid)
transaction_uuid_list = [cell.getUUID() for cell \
uuid_list = [cell.getUUID() for cell \
in self.app.pt.getCellList(partition, readable=True)]
if len(transaction_uuid_list) == 0:
if len(uuid_list) == 0:
raise VerificationFailure
uuid_set.update(transaction_uuid_list)
uuid_set.update(uuid_list)
# Gather OIDs.
self._uuid_dict = {}
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in transaction_uuid_list:
self._uuid_dict[uuid] = False
conn.ask(Packets.AskTransactionInformation(tid))
for node in self.app.nm.getIdentifiedList(pool_set=uuid_list):
self._uuid_dict[node.getUUID()] = False
node.ask(Packets.AskTransactionInformation(tid))
if len(self._uuid_dict) == 0:
raise VerificationFailure
......@@ -174,11 +165,9 @@ class VerificationManager(BaseServiceHandler):
uuid_set.update(object_uuid_list)
self._object_present = True
for conn in em.getConnectionList():
uuid = conn.getUUID()
if uuid in object_uuid_list:
self._uuid_dict[uuid] = False
conn.ask(Packets.AskObjectPresent(oid, tid))
for node in nm.getIdentifiedList(pool_set=object_uuid_list):
self._uuid_dict[node.getUUID()] = False
node.ask(Packets.AskObjectPresent(oid, tid))
while True:
em.poll(1)
......
......@@ -225,6 +225,10 @@ NODE_CLASS_MAPPING = {
class NodeManager(object):
"""This class manages node status."""
# TODO: rework getXXXList() methods, filter first by node type
# - getStorageList(identified=True, connected=True, )
# - getList(...)
def __init__(self):
self._node_set = set()
self._address_dict = {}
......
......@@ -185,9 +185,19 @@ class MasterNotificationsHandlerTests(MasterHandlerTests):
(NodeTypes.CLIENT, addr, self.getNewUUID(), NodeStates.UNKNOWN),
(NodeTypes.STORAGE, addr, self.getNewUUID(), NodeStates.DOWN),
]
# XXX: it might be better to test with real node & node manager
conn1, conn2 = Mock({'__repr__': 'conn1'}), Mock({'__repr__': 'conn2'})
self.app.em = Mock({'getConnectionListByUUID': ReturnValues([conn1],
[conn2])})
node1 = Mock({
'getConnection': conn1,
'__nonzero__': 1,
'isConnected': True,
})
node2 = Mock({
'getConnection': conn2,
'__nonzero__': 1,
'isConnected': True,
})
self.app.nm = Mock({'getByUUID': ReturnValues(node1, node2)})
self.app.cp = Mock()
self.handler.notifyNodeInformation(conn, node_list)
# node manager updated
......
......@@ -31,7 +31,7 @@ class MasterClientHandlerTests(NeoTestBase):
self.app = Application(config)
self.app.pt.clear()
self.app.pt.setID(pack('!Q', 1))
self.app.em = Mock({"getConnectionList" : []})
self.app.em = Mock()
self.app.loid = '\0' * 8
self.app.tm.setLastTID('\0' * 8)
self.service = ClientServiceHandler(self.app)
......@@ -117,7 +117,7 @@ class MasterClientHandlerTests(NeoTestBase):
oid_list = []
tid = self.app.tm.getLastTID()
conn = self.getFakeConnection(client_uuid, self.client_address)
self.app.em = Mock({"getConnectionList" : [conn, storage_conn]})
self.app.nm.getByUUID(storage_uuid).setConnection(storage_conn)
service.askFinishTransaction(conn, oid_list, tid)
self.checkAskLockInformation(storage_conn)
self.assertEquals(len(self.app.tm.getPendingList()), 1)
......
......@@ -43,7 +43,7 @@ class MasterClientElectionTests(NeoTestBase):
config = self.getMasterConfiguration(master_number=1)
self.app = Application(config)
self.app.pt.clear()
self.app.em = Mock({"getConnectionList" : []})
self.app.em = Mock()
self.app.uuid = self._makeUUID('M')
self.app.server = ('127.0.0.1', 10000)
self.app.name = 'NEOCLUSTER'
......@@ -209,7 +209,7 @@ class MasterServerElectionTests(NeoTestBase):
self.app = Application(config)
self.app.pt.clear()
self.app.name = 'NEOCLUSTER'
self.app.em = Mock({"getConnectionList" : []})
self.app.em = Mock()
self.election = ServerElectionHandler(self.app)
self.app.unconnected_master_node_set = set()
self.app.negotiating_master_node_set = set()
......
......@@ -50,18 +50,20 @@ class MasterAppTests(NeoTestBase):
def test_06_broadcastNodeInformation(self):
# defined some nodes to which data will be send
master_uuid = self.getNewUUID()
self.app.nm.createMaster(uuid=master_uuid)
master = self.app.nm.createMaster(uuid=master_uuid)
storage_uuid = self.getNewUUID()
storage = self.app.nm.createStorage(uuid=storage_uuid)
client_uuid = self.getNewUUID()
client = self.app.nm.createClient(uuid=client_uuid)
# create conn and patch em
master_conn = Mock()
storage_conn = Mock()
client_conn = Mock()
master.setConnection(master_conn)
storage.setConnection(storage_conn)
client.setConnection(client_conn)
self.app.nm.add(storage)
self.app.nm.add(client)
# create conn and patch em
master_conn = Mock({"getUUID" : master_uuid})
storage_conn = Mock({"getUUID" : storage_uuid})
client_conn = Mock({"getUUID" : client_uuid})
self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
# no address defined, not send to client node
c_node = self.app.nm.createClient(uuid = self.getNewUUID())
......@@ -72,10 +74,6 @@ class MasterAppTests(NeoTestBase):
self.checkNotifyNodeInformation(storage_conn)
# address defined and client type
master_conn = Mock({"getUUID" : master_uuid})
storage_conn = Mock({"getUUID" : storage_uuid})
client_conn = Mock({"getUUID" : client_uuid})
self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
s_node = self.app.nm.createClient(
uuid = self.getNewUUID(),
address=("127.1.0.1", 3361)
......@@ -87,10 +85,6 @@ class MasterAppTests(NeoTestBase):
self.checkNotifyNodeInformation(storage_conn)
# address defined and storage type
master_conn = Mock({"getUUID" : master_uuid})
storage_conn = Mock({"getUUID" : storage_uuid})
client_conn = Mock({"getUUID" : client_uuid})
self.app.em = Mock({"getConnectionList" : (master_conn, storage_conn, client_conn)})
s_node = self.app.nm.createStorage(
uuid=self.getNewUUID(),
address=("127.0.0.1", 1351)
......
......@@ -34,7 +34,7 @@ class MasterStorageHandlerTests(NeoTestBase):
self.app = Application(config)
self.app.pt.clear()
self.app.pt.setID(pack('!Q', 1))
self.app.em = Mock({"getConnectionList" : []})
self.app.em = Mock()
self.service = StorageServiceHandler(self.app)
self.client_handler = ClientServiceHandler(self.app)
# define some variable to simulate client and storage node
......@@ -72,6 +72,7 @@ class MasterStorageHandlerTests(NeoTestBase):
node = nm.createFromNodeType(node_type, address=(ip, port),
uuid=uuid)
conn = self.getFakeConnection(node.getUUID(),node.getAddress())
node.setConnection(conn)
return (node, conn)
def test_answerInformationLocked_1(self):
......@@ -106,7 +107,6 @@ class MasterStorageHandlerTests(NeoTestBase):
# a faked event manager
connection_list = [client_conn_1, client_conn_2, storage_conn_1,
storage_conn_2]
self.app.em = Mock({"getConnectionList" : connection_list})
# register a transaction
tid = self.app.tm.begin(client_1, None)
self.app.tm.prepare(tid, oid_list, uuid_list, msg_id)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment