Commit d1226479 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Split client handlers in four. Some bug fixes included and client no more add

feeding nodes in the partition table. Updated tests will follow.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@476 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d8ebfbb6
......@@ -31,7 +31,7 @@ from neo.protocol import Packet, INVALID_UUID, INVALID_TID, INVALID_PARTITION, \
INVALID_PTID, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
UP_TO_DATE_STATE, FEEDING_STATE, INVALID_SERIAL
from neo.client.handler import ClientEventHandler, ClientAnswerEventHandler
from neo.client.handler import *
from neo.client.exception import NEOStorageError, NEOStorageConflictError, \
NEOStorageNotFoundError
from neo.util import makeChecksum, dump
......@@ -71,10 +71,11 @@ class ConnectionPool(object):
app = self.app
# Loop until a connection is obtained.
while 1:
while True:
logging.info('trying to connect to %s', node)
app.setNodeReady()
conn = MTClientConnection(app.em, app.handler, addr,
handler = StorageBootstrapEventHandler(app, app.dispatcher)
conn = MTClientConnection(app.em, handler, addr,
connector_handler=app.connector_handler)
conn.lock()
try:
......@@ -95,21 +96,22 @@ class ConnectionPool(object):
conn.unlock()
try:
app._waitMessage(conn, msg_id)
app._waitMessage(conn, msg_id, handler=handler)
except NEOStorageError:
logging.error('Connection to storage node %s failed', node)
return None
if app.isNodeReady():
logging.info('connected to storage node %s', node)
# FIXME: remove this assertion
assert conn.getUUID() != INVALID_UUID
conn.setHandler(self.app.storage_handler)
return conn
else:
# Connection failed, notify primary master node
logging.info('Storage node %s not ready', node)
return None
sleep(1)
def _dropConnections(self):
"""Drop connections."""
for node_uuid, conn in self.connection_dict.items():
......@@ -206,8 +208,10 @@ class Application(object):
self.ptid = INVALID_PTID
self.num_replicas = 0
self.num_partitions = 0
self.handler = ClientEventHandler(self, self.dispatcher)
self.answer_handler = ClientAnswerEventHandler(self, self.dispatcher)
#self.handler = ClientEventHandler(self, self.dispatcher)
self.primary_handler = PrimaryEventHandler(self, self.dispatcher)
self.storage_handler = StorageEventHandler(self, self.dispatcher)
#self.answer_handler = ClientAnswerEventHandler(self, self.dispatcher)
# Transaction specific variable
self.tid = None
self.txn = None
......@@ -248,27 +252,30 @@ class Application(object):
self.local_var.queue = Queue(5)
return self.local_var.queue
def _waitMessage(self, target_conn = None, msg_id = None):
def _waitStorageMessage(self, target_conn=None, msg_id=None):
self._waitMessage(target_conn, msg_id, self.storage_handler)
def _waitPrimaryMessage(self, target_conn=None, msg_id=None):
self._waitMessage(target_conn, msg_id, self.primary_handler)
def _waitMessage(self, target_conn = None, msg_id = None, handler=None):
"""Wait for a message returned by the dispatcher in queues."""
local_queue = self.getQueue()
while 1:
if msg_id is None:
try:
if msg_id is None:
conn, packet = local_queue.get_nowait()
except Empty:
break
else:
conn, packet = local_queue.get()
except Empty:
break
if packet is None:
if conn is target_conn:
raise NEOStorageError('connection closed')
else:
continue
self.answer_handler.dispatch(conn, packet)
handler.dispatch(conn, packet)
if target_conn is conn and msg_id == packet.getId() \
and packet.getType() & 0x8000:
break
......@@ -300,7 +307,7 @@ class Application(object):
finally:
conn.unlock()
self._waitMessage(conn, msg_id)
self._waitPrimaryMessage(conn, msg_id)
if len(self.new_oid_list) <= 0:
raise NEOStorageError('new_oid failed')
return self.new_oid_list.pop()
......@@ -356,7 +363,7 @@ class Application(object):
finally:
conn.unlock()
self._waitMessage(conn, msg_id)
self._waitStorageMessage(conn, msg_id)
if self.local_var.asked_object == -1:
# OID not found
break
......@@ -461,7 +468,7 @@ class Application(object):
finally:
conn.unlock()
# Wait for answer
self._waitMessage(conn, msg_id)
self._waitPrimaryMessage(conn, msg_id)
if self.tid is None:
raise NEOStorageError('tpc_begin failed')
else:
......@@ -487,7 +494,7 @@ class Application(object):
compressed_data = compress(data)
checksum = makeChecksum(compressed_data)
for cell in cell_list:
logging.info("storing object %s %s" %(cell.getServer(),cell.getState()))
#logging.info("storing object %s %s" %(cell.getServer(),cell.getState()))
conn = self.cp.getConnForNode(cell)
if conn is None:
continue
......@@ -505,7 +512,7 @@ class Application(object):
conn.unlock()
# Check we don't get any conflict
self._waitMessage(conn, msg_id)
self._waitStorageMessage(conn, msg_id)
if self.txn_object_stored[0] == -1:
if self.txn_data_dict.has_key(oid):
# One storage already accept the object, is it normal ??
......@@ -552,7 +559,7 @@ class Application(object):
finally:
conn.unlock()
self._waitMessage(conn, msg_id)
self._waitStorageMessage(conn, msg_id)
if not self.isTransactionVoted():
raise NEOStorageError('tpc_vote failed')
......@@ -625,7 +632,8 @@ class Application(object):
conn.unlock()
# Wait for answer
self._waitMessage(conn, msg_id)
self._waitPrimaryMessage(conn, msg_id)
if not self.isTransactionFinished():
raise NEOStorageError('tpc_finish failed')
......@@ -643,7 +651,6 @@ class Application(object):
finally:
self._load_lock_release()
def undo(self, transaction_id, txn, wrapper):
if txn is not self.txn:
raise StorageTransactionError(self, transaction_id)
......@@ -669,7 +676,7 @@ class Application(object):
conn.unlock()
# Wait for answer
self._waitMessage(conn, msg_id)
self._waitStorageMessage(conn, msg_id)
if self.local_var.txn_info == -1:
# Tid not found, try with next node
continue
......@@ -744,7 +751,7 @@ class Application(object):
# Wait for answers from all storages.
# FIXME this is a busy loop.
while True:
self._waitMessage()
self._waitStorageMessage()
if len(self.local_var.node_tids.keys()) == len(storage_node_list):
break
......@@ -778,7 +785,7 @@ class Application(object):
conn.unlock()
# Wait for answer
self._waitMessage(conn, msg_id)
self._waitStorageMessage(conn, msg_id)
if self.local_var.txn_info == -1:
# TID not found, go on with next node
continue
......@@ -829,7 +836,7 @@ class Application(object):
finally:
conn.unlock()
self._waitMessage(conn, msg_id)
self._waitStorageMessage(conn, msg_id)
if self.local_var.history == -1:
# Not found, go on with next node
continue
......@@ -867,7 +874,7 @@ class Application(object):
conn.unlock()
# Wait for answer
self._waitMessage(conn, msg_id)
self._waitStorageMessage(conn, msg_id)
if self.local_var.txn_info == -1:
# TID not found
continue
......@@ -894,23 +901,25 @@ class Application(object):
close = __del__
def sync(self):
self._waitMessage()
self._waitStorageMessage()
def connectToPrimaryMasterNode(self):
"""Connect to a primary master node.
This can be called either at bootstrap or when
client got disconnected during process"""
# Indicate we are trying to connect to avoid multiple try a time
acquired = self._connecting_to_master_node_acquire(0)
if acquired:
self.master_conn = None
logging.debug('connecting to primary master...')
# acquire the lock to allow only one thread to connect to the primary
lock = self._connecting_to_master_node_acquire(1)
try:
if self.master_conn is not None:
# another thread has done the job
logging.debug('already connected')
return
if self.pt is not None:
self.pt.clear()
master_index = 0
conn = None
# Make application execute remaining message if any
self._waitMessage()
while 1:
self._waitStorageMessage()
while True:
self.setNodeReady()
if self.primary_master_node is None:
# Try with master node defined in config
......@@ -923,7 +932,9 @@ class Application(object):
else:
addr, port = self.primary_master_node.getServer()
# Request Node Identification
conn = MTClientConnection(self.em, self.handler, (addr, port), connector_handler=self.connector_handler)
handler = PrimaryBoostrapEventHandler(self, self.dispatcher)
conn = MTClientConnection(self.em, handler, (addr, port),
connector_handler=self.connector_handler)
if self.nm.getNodeByServer((addr, port)) is None:
n = MasterNode(server = (addr, port))
self.nm.add(n)
......@@ -934,8 +945,6 @@ class Application(object):
p = Packet()
p.requestNodeIdentification(msg_id, CLIENT_NODE_TYPE, self.uuid,
'0.0.0.0', 0, self.name)
# Send message
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, self.getQueue())
......@@ -944,7 +953,7 @@ class Application(object):
# Wait for answer
while 1:
self._waitMessage()
self._waitMessage(handler=handler)
# Now check result
if self.primary_master_node is not None:
if self.primary_master_node == -1:
......@@ -967,7 +976,9 @@ class Application(object):
sleep(1)
logging.info("connected to primary master node %s" % self.primary_master_node)
conn.setHandler(PrimaryEventHandler(self, self.dispatcher))
self.master_conn = conn
finally:
self._connecting_to_master_node_release()
......
......@@ -22,7 +22,7 @@ from neo.connection import MTClientConnection
from neo.protocol import Packet, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
BROKEN_STATE
BROKEN_STATE, FEEDING_STATE, DISCARDED_STATE
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable
from neo.client.exception import NEOStorageError
......@@ -50,14 +50,8 @@ class BaseClientEventHandler(EventHandler):
finally:
conn.release()
class ClientEventHandler(BaseClientEventHandler):
"""This class discriminates answers from non-answers, queues answer to
their requester and handles non-answers directly."""
def packetReceived(self, conn, packet):
"""Redirect all received packet to dispatcher thread."""
logging.debug('packet %d:%x received from %s:%d',
packet.getId(), packet.getType(), *(conn.getAddress()))
queue = self.dispatcher.getQueue(conn, packet)
if queue is None:
self.dispatch(conn, packet)
......@@ -94,93 +88,175 @@ class ClientEventHandler(BaseClientEventHandler):
finally:
conn.unlock()
def connectionFailed(self, conn):
app = self.app
uuid = conn.getUUID()
if app.primary_master_node is None:
# Failed to connect to a master node
app.primary_master_node = -1
elif self.app.primary_master_node is not None and uuid == \
self.app.primary_master_node.getUUID():
logging.critical("connection to primary master node failed")
app.connectToPrimaryMasterNode()
else:
# Connection to a storage node failed
node = app.nm.getNodeByServer(conn.getAddress())
if node.getNodeType() == STORAGE_NODE_TYPE:
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(ClientEventHandler, self).connectionFailed(conn)
class PrimaryBoostrapEventHandler(BaseClientEventHandler):
# Bootstrap handler used when looking for the primary master
def connectionFailed(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).connectionFailed(conn)
def connectionClosed(self, conn):
uuid = conn.getUUID()
app = self.app
if app.primary_master_node is None:
# Failed to connect to a master node
app.primary_master_node = -1
elif app.master_conn is not None and uuid == app.master_conn.getUUID():
logging.critical("connection to primary master node closed")
# Close connection
app.master_conn.close()
app.master_conn = None
app.primary_master_node = None
app.connectToPrimaryMasterNode()
else:
node = app.nm.getNodeByServer(conn.getAddress())
if node.getNodeType() == STORAGE_NODE_TYPE:
# Notify primary master node that a storage node is temporarily down
logging.info("connection to storage node %s closed",
node.getServer())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).connectionClosed(conn)
super(ClientEventHandler, self).connectionClosed(conn)
def peerBroken(self, conn):
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).peerBroken(conn)
def timeoutExpired(self, conn):
uuid = conn.getUUID()
if self.app.primary_master_node is None:
self.app.primary_master_node = -1
super(PrimaryBoostrapEventHandler, self).timeoutExpired(conn)
def handleNotReady(self, conn, packet, message):
self.app.setNodeNotReady()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
app = self.app
if app.primary_master_node is None:
# Failed to connect to a master node
app.primary_master_node = -1
elif app.master_conn is not None and uuid == app.primary_master_node.getUUID():
logging.critical("connection timeout to primary master node expired")
app.connectToPrimaryMasterNode()
else:
node = app.nm.getNodeByServer(conn.getAddress())
if node.getNodeType() == STORAGE_NODE_TYPE:
# Notify primary master node that a storage node is
# temporarily down.
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
# this must be a master node
if node_type != MASTER_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
app.nm.remove(node)
conn.close()
return
super(ClientEventHandler, self).timeoutExpired(conn)
conn.setUUID(uuid)
node.setUUID(uuid)
def peerBroken(self, conn):
if your_uuid != INVALID_UUID:
# got an uuid from the primary master
app.uuid = your_uuid
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
app.num_partitions = num_partitions
app.num_replicas = num_replicas
# Ask a primary master.
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.askPrimaryMaster(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, app.getQueue())
finally:
conn.unlock()
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
if app.primary_master_node is None:
# Failed to connect to a master node
app.primary_master_node = -1
elif app.master_conn is not None and uuid == app.primary_master_node.getUUID():
logging.critical("primary master node is broken")
app.connectToPrimaryMasterNode()
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by master node
if node.getNodeType() != MASTER_NODE_TYPE:
conn.close()
return
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
pass
else:
node = app.nm.getNodeByServer(conn.getAddress())
if node.getNodeType() == STORAGE_NODE_TYPE:
self._dealWithStorageFailure(conn, node, BROKEN_STATE)
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary_master_node = primary_node
super(ClientEventHandler, self).peerBroken(conn)
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
app = self.app
nm = app.nm
node = nm.getNodeByUUID(uuid)
# This must be sent only by a primary master node.
# Note that this may be sent before I know that it is
# a primary master node.
if node.getNodeType() != MASTER_NODE_TYPE:
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
if node_type == MASTER_NODE_TYPE:
n = nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None:
n.setUUID(uuid)
elif node_type == STORAGE_NODE_TYPE:
if uuid == INVALID_UUID:
# No interest.
continue
n = nm.getNodeByUUID(uuid)
if n is None:
n = StorageNode(server = addr, uuid = uuid)
nm.add(n)
else:
n.setServer(addr)
elif node_type == CLIENT_NODE_TYPE:
continue
n.setState(state)
# Master node handler
def handleSendPartitionTable(self, conn, packet, ptid, row_list):
# XXX: This handler should be in ClientAnswerEventHandler, since this
# This handler is in PrimaryBoostrapEventHandler, since this
# basicaly is an answer to askPrimaryMaster.
# Extract from P-NEO-Protocol.Description:
# Connection to primary master node (PMN in service state)
# CN -> PMN : askPrimaryMaster
# PMN -> CN : answerPrimaryMaster containing primary uuid and no
# known master list
# PMN -> CN : notifyNodeInformation containing list of all nodes
# PMN -> CN : sendPartitionTable containing partition table id and
# PMN -> CN : notifyNodeInformation containing list of all
# ASK_STORE_TRANSACTION# PMN -> CN : sendPartitionTable containing partition table id and
# list of rows
# notifyNodeInformation is valid as asynchrounous event, but
# sendPartitionTable is only triggered after askPrimaryMaster.
......@@ -209,22 +285,33 @@ class ClientEventHandler(BaseClientEventHandler):
nm.add(node)
pt.setCell(offset, node, state)
def handleNotifyNodeInformation(self, conn, packet, node_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
class PrimaryEventHandler(BaseClientEventHandler):
# Handler used to communicate with the primary master
def connectionClosed(self, conn):
logging.critical("connection to primary master node closed")
# Close connection
app = self.app
app.master_conn.close()
app.master_conn = None
app.primary_master_node = None
app.connectToPrimaryMasterNode()
super(PrimaryEventHandler, self).peerBroken(conn)
def timeoutExpired(self, conn):
logging.critical("connection timeout to primary master node expired")
self.app.connectToPrimaryMasterNode()
super(PrimaryEventHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
logging.critical("primary master node is broken")
self.app.connectToPrimaryMasterNode()
super(PrimaryEventHandler, self).peerBroken(conn)
def handleNotifyNodeInformation(self, conn, packet, node_list):
app = self.app
nm = app.nm
node = nm.getNodeByUUID(uuid)
# This must be sent only by a primary master node.
# Note that this may be sent before I know that it is
# a primary master node.
if node.getNodeType() != MASTER_NODE_TYPE:
logging.warn('ignoring notify node information from %s',
dump(uuid))
return
for node_type, ip_address, port, uuid, state in node_list:
# Register new nodes.
addr = (ip_address, port)
......@@ -258,17 +345,6 @@ class ClientEventHandler(BaseClientEventHandler):
app = self.app
nm = app.nm
pt = app.pt
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE \
or app.primary_master_node is None \
or app.primary_master_node.getUUID() != uuid:
return
if app.ptid >= ptid:
# Ignore this packet.
......@@ -282,34 +358,50 @@ class ClientEventHandler(BaseClientEventHandler):
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
if state == DISCARDED_STATE:
if state in (DISCARDED_STATE, FEEDING_STATE):
pt.dropNode(node)
else:
pt.setCell(offset, node, state)
def handleInvalidateObjects(self, conn, packet, oid_list, tid):
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app._cache_lock_acquire()
try:
# ZODB required a dict with oid as key, so create it
oids = {}
for oid in oid_list:
oids[oid] = tid
try:
del app.mq_cache[oid]
except KeyError:
pass
db = app.getDB()
if db is not None:
db.invalidate(tid, oids)
finally:
app._cache_lock_release()
app.setTID(tid)
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
def handleAnswerNewOIDs(self, conn, packet, oid_list):
app = self.app
app.new_oid_list = oid_list
app.new_oid_list.reverse()
class ClientAnswerEventHandler(BaseClientEventHandler):
"""This class handles events only expected as answers to requests."""
def handleNotifyTransactionFinished(self, conn, packet, tid):
app = self.app
if tid == app.getTID():
app.setTransactionFinished()
class StorageBootstrapEventHandler(BaseClientEventHandler):
# Handler used when connecting to a storage node
def connectionFailed(self, conn):
# Connection to a storage node failed
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageBootstrapEventHandler, self).connectionFailed(conn)
def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageBootstrapEventHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageBootstrapEventHandler, self).timeoutExpired(conn)
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, BROKEN_STATE)
super(StorageBootstrapEventHandler, self).peerBroken(conn)
def handleNotReady(self, conn, packet, message):
app = self.app
......@@ -321,15 +413,14 @@ class ClientAnswerEventHandler(BaseClientEventHandler):
app = self.app
node = app.nm.getNodeByServer(conn.getAddress())
# It can be eiter a master node or a storage node
if node_type == CLIENT_NODE_TYPE:
if node_type != STORAGE_NODE_TYPE:
conn.close()
return
if conn.getAddress() != (ip_address, port):
# The server address is different! Then why was
# the connection successful?
logging.error('%s:%d is waiting for %s:%d',
conn.getAddress()[0], conn.getAddress()[1],
ip_address, port)
conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
app.nm.remove(node)
conn.close()
return
......@@ -337,89 +428,47 @@ class ClientAnswerEventHandler(BaseClientEventHandler):
conn.setUUID(uuid)
node.setUUID(uuid)
if node_type == MASTER_NODE_TYPE:
# Create partition table if necessary
if app.pt is None:
app.pt = PartitionTable(num_partitions, num_replicas)
app.num_partitions = num_partitions
app.num_replicas = num_replicas
if your_uuid != INVALID_UUID:
# got an uuid from the primary master
app.uuid = your_uuid
class StorageEventHandler(BaseClientEventHandler):
# Handle all messages related to ZODB operations
# Ask a primary master.
conn.lock()
try:
msg_id = conn.getNextId()
p = Packet()
p.askPrimaryMaster(msg_id)
conn.addPacket(p)
conn.expectMessage(msg_id)
self.dispatcher.register(conn, msg_id, app.getQueue())
finally:
conn.unlock()
elif node_type == STORAGE_NODE_TYPE:
app.storage_node = node
def connectionClosed(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
logging.info("connection to storage node %s closed", node.getServer())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageEventHandler, self).connectionClosed(conn)
def timeoutExpired(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
super(StorageEventHandler, self).timeoutExpired(conn)
# Master node handler
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
uuid = conn.getUUID()
if uuid is None:
self.handleUnexpectedPacket(conn, packet)
return
def peerBroken(self, conn):
node = self.app.nm.getNodeByServer(conn.getAddress())
self._dealWithStorageFailure(conn, node, BROKEN_STATE)
super(StorageEventHandler, self).peerBroken(conn)
def handleInvalidateObjects(self, conn, packet, oid_list, tid):
app = self.app
node = app.nm.getNodeByUUID(uuid)
# This must be sent only by primary master node
if node.getNodeType() != MASTER_NODE_TYPE:
return
# Register new master nodes.
for ip_address, port, uuid in known_master_list:
addr = (ip_address, port)
n = app.nm.getNodeByServer(addr)
if n is None:
n = MasterNode(server = addr)
app.nm.add(n)
if uuid != INVALID_UUID:
# If I don't know the UUID yet, believe what the peer
# told me at the moment.
if n.getUUID() is None or n.getUUID() != uuid:
n.setUUID(uuid)
if primary_uuid != INVALID_UUID:
# The primary master is defined.
if app.primary_master_node is not None \
and app.primary_master_node.getUUID() != primary_uuid:
# There are multiple primary master nodes. This is
# dangerous.
raise ElectionFailure, 'multiple primary master nodes'
primary_node = app.nm.getNodeByUUID(primary_uuid)
if primary_node is None:
# I don't know such a node. Probably this information
# is old. So ignore it.
app._cache_lock_acquire()
try:
# ZODB required a dict with oid as key, so create it
oids = {}
for oid in oid_list:
oids[oid] = tid
try:
del app.mq_cache[oid]
except KeyError:
pass
else:
if primary_node.getUUID() == primary_uuid:
# Whatever the situation is, I trust this master.
app.primary_master_node = primary_node
def handleAnswerNewTID(self, conn, packet, tid):
app = self.app
app.setTID(tid)
def handleAnswerNewOIDs(self, conn, packet, oid_list):
app = self.app
app.new_oid_list = oid_list
app.new_oid_list.reverse()
db = app.getDB()
if db is not None:
db.invalidate(tid, oids)
finally:
app._cache_lock_release()
def handleNotifyTransactionFinished(self, conn, packet, tid):
app = self.app
if tid == app.getTID():
app.setTransactionFinished()
def handleStopOperation(self, conn, packet):
logging.critical("master node ask to stop operation")
# Storage node handler
def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial, compression,
checksum, data):
app = self.app
......@@ -470,3 +519,4 @@ class ClientAnswerEventHandler(BaseClientEventHandler):
def handleAnswerTIDs(self, conn, packet, tid_list):
app = self.app
app.local_var.node_tids[conn.getUUID()] = tid_list
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment