Commit 6fcbdc65 authored by Aurel's avatar Aurel

defined more handler for connection management


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@71 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent f465ee36
...@@ -30,8 +30,9 @@ class ConnectionManager(object): ...@@ -30,8 +30,9 @@ class ConnectionManager(object):
self.connection_lock_acquire = l.acquire self.connection_lock_acquire = l.acquire
self.connection_lock_release = l.release self.connection_lock_release = l.release
def _initNodeConnection(self, addr): def _initNodeConnection(self, node):
"""Init a connection to a given storage node.""" """Init a connection to a given storage node."""
addr = node.getServer()
handler = ClientEventHandler(self.storage) handler = ClientEventHandler(self.storage)
conn = ClientConnection(self.storage.em, handler, addr) conn = ClientConnection(self.storage.em, handler, addr)
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -42,8 +43,17 @@ class ConnectionManager(object): ...@@ -42,8 +43,17 @@ class ConnectionManager(object):
self.storage.queue.put((self.local_var.tmp_q, msg_id, conn, p), True) self.storage.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.storage.local_var.storage_node = None self.storage.local_var.storage_node = None
self.storage._waitMessage() self.storage._waitMessage()
if self.storage.storage_node is None: if self.storage.storage_node == -1:
raise NEOStorageError('Connection to storage node failed') # Connection failed, notify primary master node
logging.error('Connection to storage node %s failed' %(addr,))
conn = self.storage.master_conn
msg_id = conn.getNextId()
p = Packet()
node_list = [(STORAGE_NODE_TYPE, addr[0], addr[1], node.getUUID(),
TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
self.storage.queue.put((None, msg_id, conn, p), True)
return None
logging.debug('connected to storage node %s' %(addr,)) logging.debug('connected to storage node %s' %(addr,))
return conn return conn
...@@ -62,7 +72,9 @@ class ConnectionManager(object): ...@@ -62,7 +72,9 @@ class ConnectionManager(object):
if self.pool_size > self.max_pool_size: if self.pool_size > self.max_pool_size:
# must drop some unused connections # must drop some unused connections
self.dropConnection() self.dropConnection()
conn = self._initNodeConnection(node.getServer()) conn = self._initNodeConnection(node)
if conn is None:
return None
# add node to node manager # add node to node manager
if not self.storage.nm.hasNode(node): if not self.storage.nm.hasNode(node):
n = StorageNode(node.getServer()) n = StorageNode(node.getServer())
...@@ -82,6 +94,11 @@ class ConnectionManager(object): ...@@ -82,6 +94,11 @@ class ConnectionManager(object):
# Create new connection to node # Create new connection to node
return self._createNodeConnection(node) return self._createNodeConnection(node)
def removeConnection(self, node):
"""Explicitly remove connection when a node is broken."""
if self.connection_dict.has_key(node.getUUID()):
self.connection_dict.pop(node.getUUID())
class Application(ThreadingMixIn, object): class Application(ThreadingMixIn, object):
"""The client node application.""" """The client node application."""
...@@ -177,6 +194,7 @@ class Application(ThreadingMixIn, object): ...@@ -177,6 +194,7 @@ class Application(ThreadingMixIn, object):
self.nm.add(n) self.nm.add(n)
# Connect to defined master node and get primary master node # Connect to defined master node and get primary master node
self.local_var.tmp_q = Queue(1)
if self.primary_master_node is None: if self.primary_master_node is None:
conn = ClientConnection(self.em, handler, defined_master_addr) conn = ClientConnection(self.em, handler, defined_master_addr)
msg_id = conn.getNextId() msg_id = conn.getNextId()
...@@ -185,13 +203,14 @@ class Application(ThreadingMixIn, object): ...@@ -185,13 +203,14 @@ class Application(ThreadingMixIn, object):
defined_master_addr[0], defined_master_addr[0],
defined_master_addr[1], self.name) defined_master_addr[1], self.name)
# send message to dispatcher # send message to dispatcher
self.local_var.tmp_q = Queue(1)
self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True) self.queue.put((self.local_var.tmp_q, msg_id, conn, p), True)
self.primary_master_node = None self.primary_master_node = None
self.node_not_ready = 0 self.node_not_ready = 0
while 1: while 1:
self._waitMessage() self._waitMessage()
if self.primary_master_node == -1:
raise NEOStorageError("Unable to initialize connection to master node %s" %(defined_master_addr,))
if self.primary_master_node is not None: if self.primary_master_node is not None:
break break
if self.node_not_ready: if self.node_not_ready:
...@@ -199,7 +218,7 @@ class Application(ThreadingMixIn, object): ...@@ -199,7 +218,7 @@ class Application(ThreadingMixIn, object):
return return
logging.debug('primary master node is %s' %(self.primary_master_node.server,)) logging.debug('primary master node is %s' %(self.primary_master_node.server,))
# Close connection if not already connected to primary master node # Close connection if not already connected to primary master node
if self.primary_master_node.server != defined_master_addr: if self.primary_master_node.getServer() != defined_master_addr:
for conn in self.em.getConnectionList(): for conn in self.em.getConnectionList():
if not isinstance(conn, ListeningConnection): if not isinstance(conn, ListeningConnection):
conn.close() conn.close()
...@@ -273,6 +292,8 @@ class Application(ThreadingMixIn, object): ...@@ -273,6 +292,8 @@ class Application(ThreadingMixIn, object):
# Store data on each node # Store data on each node
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askObject(msg_id, oid, serial, tid) p.askObject(msg_id, oid, serial, tid)
...@@ -388,6 +409,8 @@ class Application(ThreadingMixIn, object): ...@@ -388,6 +409,8 @@ class Application(ThreadingMixIn, object):
checksum = adler32(compressed_data) checksum = adler32(compressed_data)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askStoreObject(msg_id, oid, serial, 1, checksum, compressed_data) p.askStoreObject(msg_id, oid, serial, 1, checksum, compressed_data)
...@@ -423,6 +446,8 @@ class Application(ThreadingMixIn, object): ...@@ -423,6 +446,8 @@ class Application(ThreadingMixIn, object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askStoreTransaction(msg_id, self.tid, user, desc, ext, oid_list) p.askStoreTransaction(msg_id, self.tid, user, desc, ext, oid_list)
...@@ -456,6 +481,8 @@ class Application(ThreadingMixIn, object): ...@@ -456,6 +481,8 @@ class Application(ThreadingMixIn, object):
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.abortTransaction(msg_id, self.tid) p.abortTransaction(msg_id, self.tid)
...@@ -468,6 +495,8 @@ class Application(ThreadingMixIn, object): ...@@ -468,6 +495,8 @@ class Application(ThreadingMixIn, object):
for storage_node in storage_node_list: for storage_node in storage_node_list:
if not aborted_node.has_key(storage_node): if not aborted_node.has_key(storage_node):
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.abortTransaction(msg_id, self.tid) p.abortTransaction(msg_id, self.tid)
...@@ -520,6 +549,8 @@ class Application(ThreadingMixIn, object): ...@@ -520,6 +549,8 @@ class Application(ThreadingMixIn, object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askTransactionInformation(msg_id, tid) p.askTransactionInformation(msg_id, tid)
...@@ -566,6 +597,8 @@ class Application(ThreadingMixIn, object): ...@@ -566,6 +597,8 @@ class Application(ThreadingMixIn, object):
self.local_var.tmp_q = Queue(len(storage_node_list)) self.local_var.tmp_q = Queue(len(storage_node_list))
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askTIDs(msg_id, first, last) p.askTIDs(msg_id, first, last)
...@@ -591,6 +624,8 @@ class Application(ThreadingMixIn, object): ...@@ -591,6 +624,8 @@ class Application(ThreadingMixIn, object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askTransactionInformation(msg_id, tid) p.askTransactionInformation(msg_id, tid)
...@@ -627,6 +662,8 @@ class Application(ThreadingMixIn, object): ...@@ -627,6 +662,8 @@ class Application(ThreadingMixIn, object):
if x.getState() == UP_TO_DATE_STATE] if x.getState() == UP_TO_DATE_STATE]
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askObjectHistory(msg_id, oid, length) p.askObjectHistory(msg_id, oid, length)
...@@ -653,6 +690,8 @@ class Application(ThreadingMixIn, object): ...@@ -653,6 +690,8 @@ class Application(ThreadingMixIn, object):
storage_node_list = self.pt.getCellList(partition_id, True) storage_node_list = self.pt.getCellList(partition_id, True)
for storage_node in storage_node_list: for storage_node in storage_node_list:
conn = self.cm.getConnForNode(storage_node.getUUID()) conn = self.cm.getConnForNode(storage_node.getUUID())
if conn is None:
continue
msg_id = conn.getNextId() msg_id = conn.getNextId()
p = Packet() p = Packet()
p.askTransactionInformation(msg_id, serial) p.askTransactionInformation(msg_id, serial)
......
...@@ -9,7 +9,7 @@ from neo.pt import PartitionTable ...@@ -9,7 +9,7 @@ from neo.pt import PartitionTable
from ZODB.TimeStamp import TimeStamp from ZODB.TimeStamp import TimeStamp
from ZODB.utils import p64 from ZODB.utils import p64
from thread import get_ident
class ClientEventHandler(EventHandler): class ClientEventHandler(EventHandler):
"""This class deals with events for a master.""" """This class deals with events for a master."""
...@@ -20,9 +20,92 @@ class ClientEventHandler(EventHandler): ...@@ -20,9 +20,92 @@ class ClientEventHandler(EventHandler):
EventHandler.__init__(self) EventHandler.__init__(self)
def packetReceived(self, conn, packet): def packetReceived(self, conn, packet):
logging.debug("received packet id %s" %(packet.getId(),)) """Redirect all received packet to dispatcher thread."""
self.dispatcher.message = conn, packet self.dispatcher.message = conn, packet
def connectionFailed(self, conn):
app = self.app
uuid = conn.getUUID()
if app.primary_master_node is None:
# Failed to connect to a master node
app.primary_master_node = -1
elif uuid == self.app.primary_master_node.getUUID():
logging.critical("connection to primary master node failed")
raise NEOStorageError("connection to primary master node failed")
else:
# Connection to a storage node failed
app.storage_node = -1
EventHandler.connectionFailed(self, conn)
def connectionClosed(self, conn):
uuid = conn.getUUID()
if self.app.primary_master_node is None:
EventHandler.connectionClosed(self, conn)
if uuid == self.app.primary_master_node.getUUID():
logging.critical("connection to primary master node closed")
raise NEOStorageError("connection to primary master node closed")
else:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode):
# Notify primary master node that a storage node is temporarily down
conn = app.master_conn
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(),
TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
app.queue.put((None, msg_id, conn, p), True)
# Remove from pool connection
app.cm.removeConnection(node)
EventHandler.connectionClosed(self, conn)
def timeoutExpired(self, conn):
uuid = conn.getUUID()
if uuid == self.app.primary_master_node.getUUID():
logging.critical("connection timeout to primary master node expired")
raise NEOStorageError("connection timeout to primary master node expired")
else:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode):
# Notify primary master node that a storage node is temporarily down
conn = app.master_conn
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(),
TEMPORARILY_DOWN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
app.queue.put((None, msg_id, conn, p), True)
# Remove from pool connection
app.cm.removeConnection(node)
EventHandler.timeoutExpired(self, conn)
def peerBroken(self, conn):
uuid = conn.getUUID()
if uuid == self.app.primary_master_node.getUUID():
logging.critical("primary master node is broken")
raise NEOStorageError("primary master node is broken")
else:
app = self.app
node = app.nm.getNodeByUUID(uuid)
if isinstance(node, StorageNode):
# Notify primary master node that a storage node is broken
conn = app.master_conn
msg_id = conn.getNextId()
p = Packet()
ip_address, port = node.getServer()
node_list = [(STORAGE_NODE_TYPE, ip_address, port, node.getUUID(),
BROKEN_STATE),]
p.notifyNodeInformation(msg_id, node_list)
app.queue.put((None, msg_id, conn, p), True)
# Remove from pool connection
app.cm.removeConnection(node)
EventHandler.peerBroken(self, conn)
def handleNotReady(self, conn, packet, message): def handleNotReady(self, conn, packet, message):
if isinstance(conn, ClientConnection): if isinstance(conn, ClientConnection):
app = self.app app = self.app
...@@ -179,8 +262,8 @@ class ClientEventHandler(EventHandler): ...@@ -179,8 +262,8 @@ class ClientEventHandler(EventHandler):
else: else:
continue continue
app.app.nm.add(n) app.app.nm.add(n)
n.setState(state)
n.setState(state)
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
...@@ -233,6 +316,13 @@ class ClientEventHandler(EventHandler): ...@@ -233,6 +316,13 @@ class ClientEventHandler(EventHandler):
else: else:
self.handleUnexpectedPacket(conn, packet) self.handleUnexpectedPacket(conn, packet)
def handleStopOperation(self, conn, packet):
if isinstance(conn, ClientConnection):
raise NEOStorageError('operation stopped')
else:
self.handleUnexpectedPacket(conn, packet)
# Storage node handler # Storage node handler
def handleAnwserObject(self, conn, packet, oid, start_serial, end_serial, compression, def handleAnwserObject(self, conn, packet, oid, start_serial, end_serial, compression,
checksum, data): checksum, data):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment