Commit ecd05c50 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Split storage handlers. As for the primary master, all incoming connections are

handled by the identification handler which apply the right handler depending on
the peer node type. Client and storage incoming connections are refused until
the storage reach the operation state, but the listening port is open to ensure
it's available, tryAgain errors are sent to wrong nodes.
Remove decorators because connection type is implied by handler type.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@752 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent 733e7e90
...@@ -32,9 +32,10 @@ from neo.connection import ListeningConnection, ClientConnection ...@@ -32,9 +32,10 @@ from neo.connection import ListeningConnection, ClientConnection
from neo.exception import OperationFailure, PrimaryFailure from neo.exception import OperationFailure, PrimaryFailure
from neo.storage.bootstrap import BootstrapEventHandler from neo.storage.bootstrap import BootstrapEventHandler
from neo.storage.verification import VerificationEventHandler from neo.storage.verification import VerificationEventHandler
from neo.storage.operation import OperationEventHandler from neo.storage.operation import MasterOperationEventHandler
from neo.storage.hidden import HiddenEventHandler from neo.storage.hidden import HiddenEventHandler
from neo.storage.replicator import Replicator from neo.storage.replicator import Replicator
from neo.storage.identification import IdentificationEventHandler
from neo.connector import getConnectorHandler from neo.connector import getConnectorHandler
from neo.pt import PartitionTable from neo.pt import PartitionTable
from neo.util import dump from neo.util import dump
...@@ -72,6 +73,8 @@ class Application(object): ...@@ -72,6 +73,8 @@ class Application(object):
self.listening_conn = None self.listening_conn = None
self.master_conn = None self.master_conn = None
# ready is True when operational and got all informations
self.ready = False
self.has_node_information = False self.has_node_information = False
self.has_partition_table = False self.has_partition_table = False
...@@ -134,23 +137,21 @@ class Application(object): ...@@ -134,23 +137,21 @@ class Application(object):
for server in self.master_node_list: for server in self.master_node_list:
self.nm.add(MasterNode(server = server)) self.nm.add(MasterNode(server = server))
# Make a listening port
handler = IdentificationEventHandler(self)
self.listening_conn = ListeningConnection(self.em, handler,
addr=self.server, connector_handler=self.connector_handler)
# Connect to a primary master node, verify data, and # Connect to a primary master node, verify data, and
# start the operation. This cycle will be executed permentnly, # start the operation. This cycle will be executed permentnly,
# until the user explicitly requests a shutdown. # until the user explicitly requests a shutdown.
while 1: while 1:
self.operational = False self.operational = False
# refuse any incoming connections for now
if self.listening_conn is not None:
self.listening_conn.close()
self.listening_conn = None
# look for the primary master # look for the primary master
self.connectToPrimaryMaster() self.connectToPrimaryMaster()
assert self.master_conn is not None assert self.master_conn is not None
if self.uuid == INVALID_UUID: if self.uuid == INVALID_UUID:
raise RuntimeError, 'No UUID supplied from the primary master' raise RuntimeError, 'No UUID supplied from the primary master'
# Make a listening port when connected to the primary
self.listening_conn = ListeningConnection(self.em, None,
addr=self.server, connector_handler=self.connector_handler)
try: try:
while 1: while 1:
try: try:
...@@ -241,32 +242,28 @@ class Application(object): ...@@ -241,32 +242,28 @@ class Application(object):
logging.info('verifying data') logging.info('verifying data')
handler = VerificationEventHandler(self) handler = VerificationEventHandler(self)
self.master_conn.setHandler(handler)
em = self.em em = self.em
# Make sure that every connection has the verfication event handler.
for conn in em.getConnectionList():
conn.setHandler(handler)
while not self.operational: while not self.operational:
em.poll(1) em.poll(1)
# ask node list # ask node list and partition table
self.master_conn.ask(protocol.askNodeInformation()) self.master_conn.ask(protocol.askNodeInformation())
self.master_conn.ask(protocol.askPartitionTable(())) self.master_conn.ask(protocol.askPartitionTable(()))
while not self.has_node_information or not self.has_partition_table: while not self.has_node_information or not self.has_partition_table:
em.poll(1) em.poll(1)
self.ready = True
def doOperation(self): def doOperation(self):
"""Handle everything, including replications and transactions.""" """Handle everything, including replications and transactions."""
logging.info('doing operation') logging.info('doing operation')
handler = OperationEventHandler(self)
em = self.em em = self.em
nm = self.nm nm = self.nm
# Make sure that every connection has the verfication event handler. handler = MasterOperationEventHandler(self)
for conn in em.getConnectionList(): self.master_conn.setHandler(handler)
conn.setHandler(handler)
# Forget all unfinished data. # Forget all unfinished data.
self.dm.dropUnfinishedData() self.dm.dropUnfinishedData()
...@@ -321,7 +318,6 @@ class Application(object): ...@@ -321,7 +318,6 @@ class Application(object):
def getPartition(self, oid_or_tid): def getPartition(self, oid_or_tid):
return unpack('!Q', oid_or_tid)[0] % self.num_partitions return unpack('!Q', oid_or_tid)[0] % self.num_partitions
def shutdown(self): def shutdown(self):
"""Close all connections and exit""" """Close all connections and exit"""
for c in self.em.getConnectionList(): for c in self.em.getConnectionList():
......
#
# Copyright (C) 2006-2009 Nexedi SA
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
import logging
from neo import protocol
from neo.storage.handler import StorageEventHandler
from neo.protocol import INVALID_SERIAL, INVALID_TID, \
INVALID_PARTITION, BROKEN_STATE, TEMPORARILY_DOWN_STATE, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DISCARDED_STATE, OUT_OF_DATE_STATE
from neo.util import dump
class IdentificationEventHandler(StorageEventHandler):
""" Handler used for incoming connections during operation state """
def connectionClosed(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def timeoutExpired(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def peerBroken(self, conn):
logging.warning('lost a node in IdentificationEventHandler')
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
self.checkClusterName(name)
# reject any incoming connections if not ready
if not self.app.ready:
raise protocol.notReady('try again')
app, nm = self.app, self.app.nm
server = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
logging.error('reject an unknown node %s', dump(uuid))
raise protocol.NotReadyError
# If this node is broken, reject it.
if node.getState() == BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError
# choose the handler according to the node type
if node_type == protocol.CLIENT_NODE_TYPE:
from neo.storage.operation import ClientOperationEventHandler
handler = ClientOperationEventHandler
elif node_type == protocol.STORAGE_NODE_TYPE:
from neo.storage.operation import StorageOperationEventHandler
handler = StorageOperationEventHandler
else:
raise protocol.protocolError('reject non-client-or-storage node')
# apply the handler and set up the connection
handler = handler(self.app)
conn.setHandler(handler)
conn.setUUID(uuid)
node.setUUID(uuid)
# FIXME: here we should use pt.getPartitions() and pt.getReplicas()
args = (STORAGE_NODE_TYPE, app.uuid, app.server[0], app.server[1],
app.num_partitions, app.num_replicas, uuid)
# accept the identification and trigger an event
conn.answer(protocol.acceptNodeIdentification(*args), packet)
handler.connectionCompleted(conn)
...@@ -25,11 +25,9 @@ from neo.protocol import INVALID_SERIAL, INVALID_TID, \ ...@@ -25,11 +25,9 @@ from neo.protocol import INVALID_SERIAL, INVALID_TID, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
DISCARDED_STATE, OUT_OF_DATE_STATE DISCARDED_STATE, OUT_OF_DATE_STATE
from neo.util import dump from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import StorageNode
from neo.connection import ClientConnection
from neo.protocol import Packet, UnexpectedPacketError from neo.protocol import Packet, UnexpectedPacketError
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import PrimaryFailure, OperationFailure
from neo import decorators
class TransactionInformation(object): class TransactionInformation(object):
"""This class represents information on a transaction.""" """This class represents information on a transaction."""
...@@ -56,21 +54,6 @@ class TransactionInformation(object): ...@@ -56,21 +54,6 @@ class TransactionInformation(object):
class OperationEventHandler(StorageEventHandler): class OperationEventHandler(StorageEventHandler):
"""This class deals with events for a operation phase.""" """This class deals with events for a operation phase."""
def connectionCompleted(self, conn):
# FIXME this must be implemented for replications.
raise NotImplementedError
def connectionFailed(self, conn):
# FIXME this must be implemented for replications.
raise NotImplementedError
def connectionAccepted(self, conn, s, addr):
"""Called when a connection is accepted."""
# Client nodes and other storage nodes may connect. Also,
# master nodes may connect, only if they misunderstand that
# I am a master node.
StorageEventHandler.connectionAccepted(self, conn, s, addr)
def dealWithClientFailure(self, uuid): def dealWithClientFailure(self, uuid):
if uuid is not None: if uuid is not None:
app = self.app app = self.app
...@@ -132,130 +115,49 @@ class OperationEventHandler(StorageEventHandler): ...@@ -132,130 +115,49 @@ class OperationEventHandler(StorageEventHandler):
StorageEventHandler.peerBroken(self, conn) StorageEventHandler.peerBroken(self, conn)
@decorators.server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
self.checkClusterName(name)
app = self.app
addr = (ip_address, port)
node = app.nm.getNodeByUUID(uuid)
if node is None:
if node_type != MASTER_NODE_TYPE:
# If I do not know such a node, and it is not even a master
# node, simply reject it.
logging.error('reject an unknown node %s', dump(uuid))
raise protocol.NotReadyError
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.num_partitions,
app.num_replicas, uuid)
conn.answer(p, packet)
if node_type == MASTER_NODE_TYPE:
conn.abort()
@decorators.client_connection_required class ClientAndStorageOperationEventHandler(OperationEventHandler):
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
raise NotImplementedError
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAskTIDs(self, conn, packet, first, last, partition):
known_master_list): # This method is complicated, because I must return TIDs only
raise UnexpectedPacketError # about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
@decorators.client_connection_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
logging.info('ignoring older partition changes')
return
# First, change the table on memory. if partition == INVALID_PARTITION:
app.ptid = ptid # Collect all usable partitions for me.
for offset, uuid, state in cell_list: getCellList = app.pt.getCellList
node = nm.getNodeByUUID(uuid) partition_list = []
if node is None: for offset in xrange(app.num_partitions):
node = StorageNode(uuid = uuid) for cell in getCellList(offset, readable=True):
if uuid != app.uuid: if cell.getUUID() == app.uuid:
node.setState(TEMPORARILY_DOWN_STATE) partition_list.append(offset)
nm.add(node) break
pt.setCell(offset, node, state) else:
partition_list = [partition]
if uuid == app.uuid:
# If this is for myself, this can affect replications.
if state == DISCARDED_STATE:
app.replicator.removePartition(offset)
elif state == OUT_OF_DATE_STATE:
app.replicator.addPartition(offset)
# Then, the database. tid_list = app.dm.getTIDList(first, last - first,
app.dm.changePartitionTable(ptid, cell_list) app.num_partitions, partition_list)
conn.answer(protocol.answerTIDs(tid_list), packet)
@decorators.client_connection_required def handleAskObjectHistory(self, conn, packet, oid, first, last):
def handleStopOperation(self, conn, packet): if first >= last:
raise OperationFailure('operation stopped') raise protocol.ProtocolError( 'invalid offsets')
def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app app = self.app
t = app.dm.getTransaction(tid) history_list = app.dm.getObjectHistory(oid, first, last - first)
if history_list is None:
if t is None: history_list = []
p = protocol.tidNotFound('%s does not exist' % dump(tid)) p = protocol.answerObjectHistory(oid, history_list)
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet) conn.answer(p, packet)
@decorators.client_connection_required
def handleLockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
app.load_lock_dict[o[0]] = tid
app.dm.storeTransaction(tid, object_list, t.getTransaction()) class ClientOperationEventHandler(ClientAndStorageOperationEventHandler):
except KeyError:
pass
conn.answer(protocol.notifyInformationLocked(tid), packet)
@decorators.client_connection_required
def handleUnlockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
del app.load_lock_dict[oid]
del app.store_lock_dict[oid]
app.dm.finishTransaction(tid)
del app.transaction_dict[tid]
# Now it may be possible to execute some events. def connectionCompleted(self, conn):
app.executeQueuedEvents() ClientAndStorageOperationEventHandler.connectionCompleted(self, conn)
except KeyError:
pass
def handleAskObject(self, conn, packet, oid, serial, tid): def handleAskObject(self, conn, packet, oid, serial, tid):
app = self.app app = self.app
...@@ -283,42 +185,27 @@ class OperationEventHandler(StorageEventHandler): ...@@ -283,42 +185,27 @@ class OperationEventHandler(StorageEventHandler):
p = protocol.oidNotFound('%s does not exist' % dump(oid)) p = protocol.oidNotFound('%s does not exist' % dump(oid))
conn.answer(p, packet) conn.answer(p, packet)
def handleAskTIDs(self, conn, packet, first, last, partition): def handleAbortTransaction(self, conn, packet, tid):
# This method is complicated, because I must return TIDs only uuid = conn.getUUID()
# about usable partitions assigned to me.
if first >= last:
raise protocol.ProtocolError('invalid offsets')
app = self.app app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
try:
del app.load_lock_dict[oid]
except KeyError:
pass
del app.store_lock_dict[oid]
if partition == INVALID_PARTITION: del app.transaction_dict[tid]
# Collect all usable partitions for me.
getCellList = app.pt.getCellList
partition_list = []
for offset in xrange(app.num_partitions):
for cell in getCellList(offset, readable=True):
if cell.getUUID() == app.uuid:
partition_list.append(offset)
break
else:
partition_list = [partition]
tid_list = app.dm.getTIDList(first, last - first,
app.num_partitions, partition_list)
conn.answer(protocol.answerTIDs(tid_list), packet)
def handleAskObjectHistory(self, conn, packet, oid, first, last):
if first >= last:
raise protocol.ProtocolError( 'invalid offsets')
app = self.app # Now it may be possible to execute some events.
history_list = app.dm.getObjectHistory(oid, first, last - first) app.executeQueuedEvents()
if history_list is None: except KeyError:
history_list = [] pass
p = protocol.answerObjectHistory(oid, history_list)
conn.answer(p, packet)
@decorators.identification_required
def handleAskStoreTransaction(self, conn, packet, tid, user, desc, def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
ext, oid_list): ext, oid_list):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -327,7 +214,6 @@ class OperationEventHandler(StorageEventHandler): ...@@ -327,7 +214,6 @@ class OperationEventHandler(StorageEventHandler):
t.addTransaction(oid_list, user, desc, ext) t.addTransaction(oid_list, user, desc, ext)
conn.answer(protocol.answerStoreTransaction(tid), packet) conn.answer(protocol.answerStoreTransaction(tid), packet)
@decorators.identification_required
def handleAskStoreObject(self, conn, packet, oid, serial, def handleAskStoreObject(self, conn, packet, oid, serial,
compression, checksum, data, tid): compression, checksum, data, tid):
uuid = conn.getUUID() uuid = conn.getUUID()
...@@ -364,35 +250,11 @@ class OperationEventHandler(StorageEventHandler): ...@@ -364,35 +250,11 @@ class OperationEventHandler(StorageEventHandler):
conn.answer(p, packet) conn.answer(p, packet)
app.store_lock_dict[oid] = tid app.store_lock_dict[oid] = tid
@decorators.identification_required
def handleAbortTransaction(self, conn, packet, tid):
uuid = conn.getUUID()
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
try:
del app.load_lock_dict[oid]
except KeyError:
pass
del app.store_lock_dict[oid]
del app.transaction_dict[tid] class StorageOperationEventHandler(ClientAndStorageOperationEventHandler):
# Now it may be possible to execute some events. def connectionCompleted(self, conn):
app.executeQueuedEvents() ClientAndStorageOperationEventHandler.connectionCompleted(self, conn)
except KeyError:
pass
@decorators.client_connection_required
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
self.app.replicator.setCriticalTID(packet, ltid)
@decorators.client_connection_required
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
self.app.replicator.setUnfinishedTIDList(tid_list)
def handleAskOIDs(self, conn, packet, first, last, partition): def handleAskOIDs(self, conn, packet, first, last, partition):
# This method is complicated, because I must return OIDs only # This method is complicated, because I must return OIDs only
...@@ -413,7 +275,94 @@ class OperationEventHandler(StorageEventHandler): ...@@ -413,7 +275,94 @@ class OperationEventHandler(StorageEventHandler):
break break
else: else:
partition_list = [partition] partition_list = [partition]
oid_list = app.dm.getOIDList(first, last - first, oid_list = app.dm.getOIDList(first, last - first,
app.num_partitions, partition_list) app.num_partitions, partition_list)
conn.answer(protocol.answerOIDs(oid_list), packet) conn.answer(protocol.answerOIDs(oid_list), packet)
class MasterOperationEventHandler(OperationEventHandler):
def handleStopOperation(self, conn, packet):
raise OperationFailure('operation stopped')
def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
self.app.replicator.setCriticalTID(packet, ltid)
def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
self.app.replicator.setUnfinishedTIDList(tid_list)
def handleAskTransactionInformation(self, conn, packet, tid):
app = self.app
t = app.dm.getTransaction(tid)
if t is None:
p = protocol.tidNotFound('%s does not exist' % dump(tid))
else:
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet)
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that
the information is only about changes from the previous."""
app = self.app
nm = app.nm
pt = app.pt
if app.ptid >= ptid:
# Ignore this packet.
logging.info('ignoring older partition changes')
return
# First, change the table on memory.
app.ptid = ptid
for offset, uuid, state in cell_list:
node = nm.getNodeByUUID(uuid)
if node is None:
node = StorageNode(uuid = uuid)
if uuid != app.uuid:
node.setState(TEMPORARILY_DOWN_STATE)
nm.add(node)
pt.setCell(offset, node, state)
if uuid == app.uuid:
# If this is for myself, this can affect replications.
if state == DISCARDED_STATE:
app.replicator.removePartition(offset)
elif state == OUT_OF_DATE_STATE:
app.replicator.addPartition(offset)
# Then, the database.
app.dm.changePartitionTable(ptid, cell_list)
logging.info('Partition table updated:')
self.app.pt.log()
def handleLockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
app.load_lock_dict[o[0]] = tid
app.dm.storeTransaction(tid, object_list, t.getTransaction())
except KeyError:
pass
conn.answer(protocol.notifyInformationLocked(tid), packet)
def handleUnlockInformation(self, conn, packet, tid):
app = self.app
try:
t = app.transaction_dict[tid]
object_list = t.getObjectList()
for o in object_list:
oid = o[0]
del app.load_lock_dict[oid]
del app.store_lock_dict[oid]
app.dm.finishTransaction(tid)
del app.transaction_dict[tid]
# Now it may be possible to execute some events.
app.executeQueuedEvents()
except KeyError:
pass
...@@ -24,10 +24,8 @@ from neo.protocol import INVALID_OID, INVALID_TID, \ ...@@ -24,10 +24,8 @@ from neo.protocol import INVALID_OID, INVALID_TID, \
MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \ MASTER_NODE_TYPE, STORAGE_NODE_TYPE, \
Packet, UnexpectedPacketError Packet, UnexpectedPacketError
from neo.util import dump from neo.util import dump
from neo.node import MasterNode, StorageNode, ClientNode from neo.node import StorageNode
from neo.connection import ClientConnection
from neo.exception import PrimaryFailure, OperationFailure from neo.exception import PrimaryFailure, OperationFailure
from neo import decorators
class VerificationEventHandler(StorageEventHandler): class VerificationEventHandler(StorageEventHandler):
"""This class deals with events for a verification phase.""" """This class deals with events for a verification phase."""
...@@ -39,67 +37,23 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -39,67 +37,23 @@ class VerificationEventHandler(StorageEventHandler):
StorageEventHandler.connectionAccepted(self, conn, s, addr) StorageEventHandler.connectionAccepted(self, conn, s, addr)
def timeoutExpired(self, conn): def timeoutExpired(self, conn):
if not conn.isServerConnection(): # If a primary master node timeouts, I cannot continue.
# If a primary master node timeouts, I cannot continue. logging.critical('the primary master node times out')
logging.critical('the primary master node times out') raise PrimaryFailure('the primary master node times out')
raise PrimaryFailure('the primary master node times out')
StorageEventHandler.timeoutExpired(self, conn) StorageEventHandler.timeoutExpired(self, conn)
def connectionClosed(self, conn): def connectionClosed(self, conn):
if not conn.isServerConnection(): # If a primary master node closes, I cannot continue.
# If a primary master node closes, I cannot continue. logging.critical('the primary master node is dead')
logging.critical('the primary master node is dead') raise PrimaryFailure('the primary master node is dead')
raise PrimaryFailure('the primary master node is dead')
StorageEventHandler.connectionClosed(self, conn) StorageEventHandler.connectionClosed(self, conn)
def peerBroken(self, conn): def peerBroken(self, conn):
if not conn.isServerConnection(): # If a primary master node gets broken, I cannot continue.
# If a primary master node gets broken, I cannot continue. logging.critical('the primary master node is broken')
logging.critical('the primary master node is broken') raise PrimaryFailure('the primary master node is broken')
raise PrimaryFailure('the primary master node is broken')
StorageEventHandler.peerBroken(self, conn) StorageEventHandler.peerBroken(self, conn)
@decorators.server_connection_required
def handleRequestNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port, name):
self.checkClusterName(name)
app = self.app
if node_type != MASTER_NODE_TYPE:
logging.info('reject a connection from a non-master')
raise protocol.NotReadyError
addr = (ip_address, port)
node = app.nm.getNodeByServer(addr)
if node is None:
node = MasterNode(server = addr, uuid = uuid)
app.nm.add(node)
else:
# If this node is broken, reject it.
if node.getUUID() == uuid:
if node.getState() == BROKEN_STATE:
raise protocol.BrokenNodeDisallowedError
# Trust the UUID sent by the peer.
node.setUUID(uuid)
conn.setUUID(uuid)
p = protocol.acceptNodeIdentification(STORAGE_NODE_TYPE, app.uuid,
app.server[0], app.server[1], app.num_partitions,
app.num_replicas, uuid)
conn.answer(p, packet)
# Now the master node should know that I am not the right one.
conn.abort()
def handleAcceptNodeIdentification(self, conn, packet, node_type,
uuid, ip_address, port,
num_partitions, num_replicas, your_uuid):
raise UnexpectedPacketError
@decorators.client_connection_required
def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
known_master_list): known_master_list):
app = self.app app = self.app
...@@ -109,7 +63,6 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -109,7 +63,6 @@ class VerificationEventHandler(StorageEventHandler):
# But a primary master node is supposed not to send any info # But a primary master node is supposed not to send any info
# with this packet, so it would be useless. # with this packet, so it would be useless.
@decorators.client_connection_required
def handleAskLastIDs(self, conn, packet): def handleAskLastIDs(self, conn, packet):
app = self.app app = self.app
oid = app.dm.getLastOID() or INVALID_OID oid = app.dm.getLastOID() or INVALID_OID
...@@ -124,8 +77,9 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -124,8 +77,9 @@ class VerificationEventHandler(StorageEventHandler):
def handleAnswerPartitionTable(self, conn, packet, ptid, row_list): def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
assert not row_list assert not row_list
self.app.has_partition_table = True self.app.has_partition_table = True
logging.info('Got the partition table :')
self.app.pt.log()
@decorators.client_connection_required
def handleAskPartitionTable(self, conn, packet, offset_list): def handleAskPartitionTable(self, conn, packet, offset_list):
app = self.app app = self.app
row_list = [] row_list = []
...@@ -144,7 +98,6 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -144,7 +98,6 @@ class VerificationEventHandler(StorageEventHandler):
p = protocol.answerPartitionTable(app.ptid, row_list) p = protocol.answerPartitionTable(app.ptid, row_list)
conn.answer(p, packet) conn.answer(p, packet)
@decorators.client_connection_required
def handleSendPartitionTable(self, conn, packet, ptid, row_list): def handleSendPartitionTable(self, conn, packet, ptid, row_list):
"""A primary master node sends this packet to synchronize a partition """A primary master node sends this packet to synchronize a partition
table. Note that the message can be split into multiple packets.""" table. Note that the message can be split into multiple packets."""
...@@ -176,7 +129,6 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -176,7 +129,6 @@ class VerificationEventHandler(StorageEventHandler):
cell.getState())) cell.getState()))
app.dm.setPartitionTable(ptid, cell_list) app.dm.setPartitionTable(ptid, cell_list)
@decorators.client_connection_required
def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list): def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
"""This is very similar to Send Partition Table, except that """This is very similar to Send Partition Table, except that
the information is only about changes from the previous.""" the information is only about changes from the previous."""
...@@ -203,15 +155,12 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -203,15 +155,12 @@ class VerificationEventHandler(StorageEventHandler):
# Then, the database. # Then, the database.
app.dm.changePartitionTable(ptid, cell_list) app.dm.changePartitionTable(ptid, cell_list)
@decorators.client_connection_required
def handleStartOperation(self, conn, packet): def handleStartOperation(self, conn, packet):
self.app.operational = True self.app.operational = True
@decorators.client_connection_required
def handleStopOperation(self, conn, packet): def handleStopOperation(self, conn, packet):
raise OperationFailure('operation stopped') raise OperationFailure('operation stopped')
@decorators.client_connection_required
def handleAskUnfinishedTransactions(self, conn, packet): def handleAskUnfinishedTransactions(self, conn, packet):
tid_list = self.app.dm.getUnfinishedTIDList() tid_list = self.app.dm.getUnfinishedTIDList()
p = protocol.answerUnfinishedTransactions(tid_list) p = protocol.answerUnfinishedTransactions(tid_list)
...@@ -225,6 +174,8 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -225,6 +174,8 @@ class VerificationEventHandler(StorageEventHandler):
# not been finished. # not been finished.
t = app.dm.getTransaction(tid, all = True) t = app.dm.getTransaction(tid, all = True)
else: else:
# XXX: this should never be used since we don't accept incoming
# connections out of the operation state.
t = app.dm.getTransaction(tid) t = app.dm.getTransaction(tid)
if t is None: if t is None:
...@@ -233,7 +184,6 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -233,7 +184,6 @@ class VerificationEventHandler(StorageEventHandler):
p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0]) p = protocol.answerTransactionInformation(tid, t[1], t[2], t[3], t[0])
conn.answer(p, packet) conn.answer(p, packet)
@decorators.client_connection_required
def handleAskObjectPresent(self, conn, packet, oid, tid): def handleAskObjectPresent(self, conn, packet, oid, tid):
if self.app.dm.objectPresent(oid, tid): if self.app.dm.objectPresent(oid, tid):
p = protocol.answerObjectPresent(oid, tid) p = protocol.answerObjectPresent(oid, tid)
...@@ -242,16 +192,9 @@ class VerificationEventHandler(StorageEventHandler): ...@@ -242,16 +192,9 @@ class VerificationEventHandler(StorageEventHandler):
'%s:%s do not exist' % (dump(oid), dump(tid))) '%s:%s do not exist' % (dump(oid), dump(tid)))
conn.answer(p, packet) conn.answer(p, packet)
@decorators.client_connection_required
def handleDeleteTransaction(self, conn, packet, tid): def handleDeleteTransaction(self, conn, packet, tid):
self.app.dm.deleteTransaction(tid, all = True) self.app.dm.deleteTransaction(tid, all = True)
@decorators.client_connection_required
def handleCommitTransaction(self, conn, packet, tid): def handleCommitTransaction(self, conn, packet, tid):
self.app.dm.finishTransaction(tid) self.app.dm.finishTransaction(tid)
def handleLockInformation(self, conn, packet, tid):
pass
def handleUnlockInformation(self, conn, packet, tid):
pass
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment