Commit 3593c1aa authored by Vincent Pelletier's avatar Vincent Pelletier

Remove all "isinstance(conn, MTClientConnection)" tests from client handler to...

Remove all "isinstance(conn, MTClientConnection)" tests from client handler to make it unit-test-friendly: Client node does not have any listening socket, so there cannot be any (MT)ServerConnection instance used to reach handler. Also, Multi-Thread safety is not a runtime but a static criterion (ie, it is most likely a bug to use non-multi-thread-safe Connection subclasses in client). So this test is an always-true check, and can be removed.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@307 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 241b4eb2
...@@ -158,340 +158,283 @@ class ClientEventHandler(EventHandler): ...@@ -158,340 +158,283 @@ class ClientEventHandler(EventHandler):
def handleNotReady(self, conn, packet, message): def handleNotReady(self, conn, packet, message):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app app.local_var.node_not_ready = 1
app.local_var.node_not_ready = 1
else:
self.handleUnexpectedPacket(conn, packet)
def handleAcceptNodeIdentification(self, conn, packet, node_type, def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, uuid, ip_address, port,
num_partitions, num_replicas): num_partitions, num_replicas):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app node = app.nm.getNodeByServer(conn.getAddress())
node = app.nm.getNodeByServer(conn.getAddress()) # It can be eiter a master node or a storage node
# It can be eiter a master node or a storage node if node_type == CLIENT_NODE_TYPE:
if node_type == CLIENT_NODE_TYPE: conn.close()
conn.close() return
return if conn.getAddress() != (ip_address, port):
if conn.getAddress() != (ip_address, port): # The server address is different! Then why was
# The server address is different! Then why was # the connection successful?
# the connection successful? logging.error('%s:%d is waiting for %s:%d',
logging.error('%s:%d is waiting for %s:%d', conn.getAddress()[0], conn.getAddress()[1],
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
ip_address, port) app.nm.remove(node)
app.nm.remove(node) conn.close()
conn.close() return
return
conn.setUUID(uuid)
conn.setUUID(uuid) node.setUUID(uuid)
node.setUUID(uuid)
if node_type == MASTER_NODE_TYPE:
if node_type == MASTER_NODE_TYPE: # Create partition table if necessary
# Create partition table if necessary if app.pt is None:
if app.pt is None: app.pt = PartitionTable(num_partitions, num_replicas)
app.pt = PartitionTable(num_partitions, num_replicas) app.num_partitions = num_partitions
app.num_partitions = num_partitions app.num_replicas = num_replicas
app.num_replicas = num_replicas
# Ask a primary master.
# Ask a primary master. conn.lock()
conn.lock() try:
try: msg_id = conn.getNextId()
msg_id = conn.getNextId() p = Packet()
p = Packet() p.askPrimaryMaster(msg_id)
p.askPrimaryMaster(msg_id) conn.addPacket(p)
conn.addPacket(p) conn.expectMessage(msg_id)
conn.expectMessage(msg_id) self.dispatcher.register(conn, msg_id, app.getQueue())
self.dispatcher.register(conn, msg_id, app.getQueue()) finally:
finally: conn.unlock()
conn.unlock() elif node_type == STORAGE_NODE_TYPE:
elif node_type == STORAGE_NODE_TYPE: app.storage_node = node
app.storage_node = node
else:
self.handleUnexpectedPacket(conn, packet)
# Master node handler # Master node handler
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list): def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
if isinstance(conn, MTClientConnection): uuid = conn.getUUID()
uuid = conn.getUUID() if uuid is None:
if uuid is None: self.handleUnexpectedPacket(conn, packet)
self.handleUnexpectedPacket(conn, packet) return
return
app = self.app
app = self.app node = app.nm.getNodeByUUID(uuid)
node = app.nm.getNodeByUUID(uuid) # This must be sent only by primary master node
# This must be sent only by primary master node if node.getNodeType() != MASTER_NODE_TYPE:
if node.getNodeType() != MASTER_NODE_TYPE: return
return # Register new master nodes.
# Register new master nodes. for ip_address, port, uuid in known_master_list:
for ip_address, port, uuid in known_master_list: addr = (ip_address, port)
addr = (ip_address, port) n = app.nm.getNodeByServer(addr)
n = app.nm.getNodeByServer(addr) if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary_master_node = primary_node
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
nm = app.nm
node = nm.getNodeByUUID(uuid)
# This must be sent only by a primary master node.
# Note that this may be sent before I know that it is
# a primary master node.
if node.getNodeType() != MASTER_NODE_TYPE:
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
if node_type == MASTER_NODE_TYPE:
n = nm.getNodeByServer(addr)
if n is None: if n is None:
n = MasterNode(server = addr) n = MasterNode(server = addr)
app.nm.add(n) nm.add(n)
if uuid != INVALID_UUID: if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer # If I don't know the UUID yet, believe what the peer
# told me at the moment. # told me at the moment.
if n.getUUID() is None: if n.getUUID() is None:
n.setUUID(uuid) n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if primary_uuid != INVALID_UUID: if uuid == INVALID_UUID:
# The primary master is defined. # No interest.
if app.primary_master_node is not None \ continue
and app.primary_master_node.getUUID() != primary_uuid: n = nm.getNodeByUUID(uuid)
# There are multiple primary master nodes. This is if n is None:
# dangerous. n = StorageNode(server = addr, uuid = uuid)
raise ElectionFailure, 'multiple primary master nodes' nm.add(n)
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else: else:
if primary_node.getUUID() == primary_uuid: n.setServer(addr)
# Whatever the situation is, I trust this master. elif node_type == CLIENT_NODE_TYPE:
app.primary_master_node = primary_node continue
else:
self.handleUnexpectedPacket(conn, packet)
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
if isinstance(conn, MTClientConnection):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
nm = app.nm
pt = app.pt
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
if app.ptid != ptid:
app.ptid = ptid
pt.clear()
for offset, row in row_list:
for uuid, state in row:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyNodeInformation(self, conn, packet, node_list): n.setState(state)
if isinstance(conn, MTClientConnection):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
nm = app.nm
node = nm.getNodeByUUID(uuid)
# This must be sent only by a primary master node.
# Note that this may be sent before I know that it is
# a primary master node.
if node.getNodeType() != MASTER_NODE_TYPE:
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
if node_type == MASTER_NODE_TYPE:
n = nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = nm.getNodeByUUID(uuid)
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
else:
n.setServer(addr)
elif node_type == CLIENT_NODE_TYPE:
continue
n.setState(state)
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app nm = app.nm
nm = app.nm pt = app.pt
pt = app.pt uuid = conn.getUUID()
uuid = conn.getUUID() if uuid is None:
if uuid is None: self.handleUnexpectedPacket(conn, packet)
self.handleUnexpectedPacket(conn, packet) return
return
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid:
return
if app.ptid >= ptid:
# Ignore this packet.
return
app.ptid = ptid app = self.app
for offset, uuid, state in cell_list: node = app.nm.getNodeByUUID(uuid)
node = nm.getNodeByUUID(uuid) # This must be sent only by primary master node
if node is None: if node.getNodeType() != MASTER_NODE_TYPE \
node = StorageNode(uuid = uuid) or app.primary_master_node is None \
if uuid != app.uuid: or app.primary_master_node.getUUID() != uuid:
node.setState(TEMPORARILY_DOWN_STATE) return
nm.add(node)
if app.ptid >= ptid:
# Ignore this packet.
return
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state) pt.setCell(offset, node, state)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNewTID(self, conn, packet, tid): def handleAnswerNewTID(self, conn, packet, tid):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app app.tid = tid
app.tid = tid
else:
self.handleUnexpectedPacket(conn, packet)
def handleNotifyTransactionFinished(self, conn, packet, tid): def handleNotifyTransactionFinished(self, conn, packet, tid):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app if tid != app.tid:
if tid != app.tid: app.txn_finished = -1
app.txn_finished = -1
else:
app.txn_finished = 1
else: else:
self.handleUnexpectedPacket(conn, packet) app.txn_finished = 1
def handleInvalidateObjects(self, conn, packet, oid_list, tid): def handleInvalidateObjects(self, conn, packet, oid_list, tid):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app app._cache_lock_acquire()
app._cache_lock_acquire() try:
try: # ZODB required a dict with oid as key, so create it
# ZODB required a dict with oid as key, so create it oids = {}
oids = {} for oid in oid_list:
for oid in oid_list: oids[oid] = tid
oids[oid] = tid try:
try: del app.mq_cache[oid]
del app.mq_cache[oid] except KeyError:
except KeyError: pass
pass if app._db is not None:
if app._db is not None: app._db.invalidate(tid, oids)
app._db.invalidate(tid, oids) finally:
finally: app._cache_lock_release()
app._cache_lock_release()
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerNewOIDs(self, conn, packet, oid_list): def handleAnswerNewOIDs(self, conn, packet, oid_list):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app app.new_oid_list = oid_list
app.new_oid_list = oid_list app.new_oid_list.reverse()
app.new_oid_list.reverse()
else:
self.handleUnexpectedPacket(conn, packet)
def handleStopOperation(self, conn, packet): def handleStopOperation(self, conn, packet):
if isinstance(conn, MTClientConnection): logging.critical("master node ask to stop operation")
logging.critical("master node ask to stop operation")
else:
self.handleUnexpectedPacket(conn, packet)
# Storage node handler # Storage node handler
def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial, compression, def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial, compression,
checksum, data): checksum, data):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app app.local_var.asked_object = (oid, start_serial, end_serial, compression,
app.local_var.asked_object = (oid, start_serial, end_serial, compression, checksum, data)
checksum, data)
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial): def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app if conflicting:
if conflicting: app.txn_object_stored = -1, serial
app.txn_object_stored = -1, serial
else:
app.txn_object_stored = oid, serial
else: else:
self.handleUnexpectedPacket(conn, packet) app.txn_object_stored = oid, serial
def handleAnswerStoreTransaction(self, conn, packet, tid): def handleAnswerStoreTransaction(self, conn, packet, tid):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app app.txn_voted = 1
app.txn_voted = 1
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerTransactionInformation(self, conn, packet, tid, def handleAnswerTransactionInformation(self, conn, packet, tid,
user, desc, ext, oid_list): user, desc, ext, oid_list):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app # transaction information are returned as a dict
# transaction information are returned as a dict info = {}
info = {} info['time'] = TimeStamp(tid).timeTime()
info['time'] = TimeStamp(tid).timeTime() info['user_name'] = user
info['user_name'] = user info['description'] = desc
info['description'] = desc info['id'] = tid
info['id'] = tid info['oids'] = oid_list
info['oids'] = oid_list app.local_var.txn_info = info
app.local_var.txn_info = info
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerObjectHistory(self, conn, packet, oid, history_list): def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app # history_list is a list of tuple (serial, size)
# history_list is a list of tuple (serial, size) app.local_var.history = oid, history_list
app.local_var.history = oid, history_list
else:
self.handleUnexpectedPacket(conn, packet)
def handleOidNotFound(self, conn, packet, message): def handleOidNotFound(self, conn, packet, message):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app # This can happen either when :
# This can happen either when : # - loading an object
# - loading an object # - asking for history
# - asking for history app.local_var.asked_object = -1
app.local_var.asked_object = -1 app.local_var.history = -1
app.local_var.history = -1
else:
self.handleUnexpectedPacket(conn, packet)
def handleTidNotFound(self, conn, packet, message): def handleTidNotFound(self, conn, packet, message):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app # This can happen when requiring txn informations
# This can happen when requiring txn informations app.local_var.txn_info = -1
app.local_var.txn_info = -1
else:
self.handleUnexpectedPacket(conn, packet)
def handleAnswerTIDs(self, conn, packet, tid_list): def handleAnswerTIDs(self, conn, packet, tid_list):
if isinstance(conn, MTClientConnection): app = self.app
app = self.app app.local_var.node_tids[conn.getUUID()] = tid_list
app.local_var.node_tids[conn.getUUID()] = tid_list
else:
self.handleUnexpectedPacket(conn, packet)
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment