From e61935799154365b76db89eb693750c7fa748d81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9gory=20Wisniewski?= <gregory@nexedi.com> Date: Tue, 2 Feb 2010 10:18:05 +0000 Subject: [PATCH] Add master's recovery manager. git-svn-id: https://svn.erp5.org/repos/neo/trunk@1592 71dcc9de-d417-0410-9af5-da40c76e7ee4 --- neo/master/app.py | 88 ++----------------- neo/master/handlers/recovery.py | 72 --------------- neo/master/recovery.py | 149 ++++++++++++++++++++++++++++++++ 3 files changed, 155 insertions(+), 154 deletions(-) delete mode 100644 neo/master/handlers/recovery.py create mode 100644 neo/master/recovery.py diff --git a/neo/master/app.py b/neo/master/app.py index 73f1706d..7d40915c 100644 --- a/neo/master/app.py +++ b/neo/master/app.py @@ -27,17 +27,16 @@ from neo.node import NodeManager from neo.event import EventManager from neo.connection import ListeningConnection, ClientConnection from neo.exception import ElectionFailure, PrimaryFailure, OperationFailure -from neo.master.handlers import election, identification, secondary, recovery +from neo.master.handlers import election, identification, secondary from neo.master.handlers import storage, client, shutdown from neo.master.handlers import administration from neo.master.pt import PartitionTable from neo.master.transactions import TransactionManager from neo.master.verification import VerificationManager +from neo.master.recovery import RecoveryManager from neo.util import dump from neo.connector import getConnectorHandler -REQUIRED_NODE_NUMBER = 1 - class Application(object): """The master node application.""" @@ -74,6 +73,7 @@ class Application(object): self.primary = None self.primary_master_node = None self.cluster_state = None + self._startup_allowed = False # Generate an UUID for self uuid = config.getUUID() @@ -83,15 +83,12 @@ class Application(object): # The last OID. self.loid = None - # The target node's uuid to request next. - self.target_uuid = None # election related data self.unconnected_master_node_set = set() self.negotiating_master_node_set = set() self._current_manager = None - self._startup_allowed = False def run(self): """Make sure that the status is sane and start a loop.""" @@ -328,61 +325,6 @@ class Application(object): if node is not None and node.isStorage(): conn.notify(packet) - def buildFromScratch(self): - nm, em, pt = self.nm, self.em, self.pt - logging.debug('creating a new partition table, wait for a storage node') - # wait for some empty storage nodes, their are accepted - while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER: - em.poll(1) - # take the first node available - node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER] - for node in node_list: - node.setRunning() - self.broadcastNodesInformation(node_list) - # resert IDs generators - self.loid = '\0' * 8 - # build the partition with this node - pt.setID(pack('!Q', 1)) - pt.make(node_list) - - def recoverStatus(self): - """ - Recover the status about the cluster. Obtain the last OID, the last - TID, and the last Partition Table ID from storage nodes, then get - back the latest partition table or make a new table from scratch, - if this is the first time. - """ - logging.info('begin the recovery of the status') - - self.changeClusterState(ClusterStates.RECOVERING) - em = self.em - - self.loid = None - self.pt.setID(None) - self.target_uuid = None - - # collect the last partition table available - while not self._startup_allowed: - em.poll(1) - - logging.info('startup allowed') - - # build a new partition table - if self.pt.getID() is None: - self.buildFromScratch() - - # collect node that are connected but not in the selected partition - # table and set them in pending state - allowed_node_set = set(self.pt.getNodeList()) - refused_node_set = set(self.nm.getStorageList()) - allowed_node_set - for node in refused_node_set: - node.setPending() - self.broadcastNodesInformation(refused_node_set) - - logging.debug('cluster starts with loid=%s and this partition table :', - dump(self.loid)) - self.pt.log() - def provideService(self): """ This is the normal mode for a primary master node. Handle transactions @@ -438,7 +380,6 @@ class Application(object): assert node.isMaster() conn.setHandler(handler) - # If I know any storage node, make sure that they are not in the # running state, because they are not connected at this stage. for node in nm.getStorageList(): @@ -446,8 +387,7 @@ class Application(object): node.setTemporarilyDown() # recover the cluster status at startup - self.recoverStatus() - + self.runManager(RecoveryManager) while True: self.runManager(VerificationManager) self.provideService() @@ -509,9 +449,7 @@ class Application(object): # select the storage handler client_handler = client.ClientServiceHandler(self) - if state == ClusterStates.RECOVERING: - storage_handler = recovery.RecoveryHandler(self) - elif state == ClusterStates.RUNNING: + if state == ClusterStates.RUNNING: storage_handler = storage.StorageServiceHandler(self) elif self._current_manager is not None: storage_handler = self._current_manager.getHandler() @@ -606,21 +544,7 @@ class Application(object): def identifyStorageNode(self, uuid, node): state = NodeStates.RUNNING handler = None - if self.cluster_state == ClusterStates.RECOVERING: - # accept storage nodes when recovery is over - if uuid is None and self._startup_allowed: - logging.info('reject empty storage node') - raise protocol.NotReadyError - handler = recovery.RecoveryHandler(self) - elif self.cluster_state == ClusterStates.VERIFYING: - if uuid is None or node is None: - # if node is unknown, it has been forget when the current - # partition was validated by the admin - # Here the uuid is not cleared to allow lookup pending nodes by - # uuid from the test framework. It's safe since nodes with a - # conflicting UUID are rejected in the identification handler. - state = NodeStates.PENDING - elif self.cluster_state == ClusterStates.RUNNING: + if self.cluster_state == ClusterStates.RUNNING: if uuid is None or node is None: # same as for verification state = NodeStates.PENDING diff --git a/neo/master/handlers/recovery.py b/neo/master/handlers/recovery.py deleted file mode 100644 index b47f2d25..00000000 --- a/neo/master/handlers/recovery.py +++ /dev/null @@ -1,72 +0,0 @@ -# -# Copyright (C) 2006-2010 Nexedi SA -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with this program; if not, write to the Free Software -# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. - -from neo import logging - -from neo.protocol import Packets, ProtocolError -from neo.master.handlers import MasterHandler -from neo.util import dump - -class RecoveryHandler(MasterHandler): - """This class deals with events for a recovery phase.""" - - def connectionLost(self, conn, new_state): - node = self.app.nm.getByUUID(conn.getUUID()) - assert node is not None - if node.getState() == new_state: - return - node.setState(new_state) - - def connectionCompleted(self, conn): - # XXX: handler split review needed to remove this hack - if not self.app._startup_allowed: - # ask the last IDs to perform the recovery - conn.ask(Packets.AskLastIDs()) - - def answerLastIDs(self, conn, loid, ltid, lptid): - app = self.app - pt = app.pt - - # Get max values. - app.loid = max(loid, app.loid) - self.app.tm.setLastTID(ltid) - if lptid > pt.getID(): - # something newer - app.target_uuid = conn.getUUID() - app.pt.setID(lptid) - conn.ask(Packets.AskPartitionTable([])) - - def answerPartitionTable(self, conn, ptid, row_list): - uuid = conn.getUUID() - app = self.app - if uuid != app.target_uuid: - # If this is not from a target node, ignore it. - logging.warn('got answer partition table from %s while waiting ' \ - 'for %s', dump(uuid), dump(app.target_uuid)) - return - # load unknown storage nodes - for offset, row in row_list: - for uuid, state in row: - node = app.nm.getByUUID(uuid) - if node is None: - app.nm.createStorage(uuid=uuid) - # load partition in memory - try: - self.app.pt.load(ptid, row_list, self.app.nm) - except IndexError: - raise ProtocolError('Invalid offset') - diff --git a/neo/master/recovery.py b/neo/master/recovery.py new file mode 100644 index 00000000..50224d4c --- /dev/null +++ b/neo/master/recovery.py @@ -0,0 +1,149 @@ +# +# Copyright (C) 2006-2010 Nexedi SA +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. + +from struct import pack + +from neo import logging +from neo.util import dump +from neo.protocol import Packets, ProtocolError, ClusterStates, NodeStates +from neo.protocol import NotReadyError +from neo.master.handlers import MasterHandler + +REQUIRED_NODE_NUMBER = 1 + +class RecoveryManager(MasterHandler): + """ + Manage the cluster recovery + """ + + def __init__(self, app): + super(RecoveryManager, self).__init__(app) + # The target node's uuid to request next. + self.target_uuid = None + + def getHandler(self): + return self + + def identifyStorageNode(self, uuid, node): + """ + Returns the handler for storage nodes + """ + if uuid is None and self.app._startup_allowed: + logging.info('reject empty storage node') + raise NotReadyError + return (uuid, NodeStates.RUNNING, self) + + def run(self): + """ + Recover the status about the cluster. Obtain the last OID, the last + TID, and the last Partition Table ID from storage nodes, then get + back the latest partition table or make a new table from scratch, + if this is the first time. + """ + logging.info('begin the recovery of the status') + + self.app.changeClusterState(ClusterStates.RECOVERING) + em = self.app.em + + self.app.loid = None + self.app.pt.setID(None) + + # collect the last partition table available + while not self.app._startup_allowed: + em.poll(1) + + logging.info('startup allowed') + + # build a new partition table + if self.app.pt.getID() is None: + self.buildFromScratch() + + # collect node that are connected but not in the selected partition + # table and set them in pending state + allowed_node_set = set(self.app.pt.getNodeList()) + refused_node_set = set(self.app.nm.getStorageList()) - allowed_node_set + for node in refused_node_set: + node.setPending() + self.app.broadcastNodesInformation(refused_node_set) + + logging.debug('cluster starts with loid=%s and this partition table :', + dump(self.app.loid)) + self.app.pt.log() + + def buildFromScratch(self): + nm, em, pt = self.app.nm, self.app.em, self.app.pt + logging.debug('creating a new partition table, wait for a storage node') + # wait for some empty storage nodes, their are accepted + while len(nm.getStorageList()) < REQUIRED_NODE_NUMBER: + em.poll(1) + # take the first node available + node_list = nm.getStorageList()[:REQUIRED_NODE_NUMBER] + for node in node_list: + node.setRunning() + self.app.broadcastNodesInformation(node_list) + # resert IDs generators + self.app.loid = '\0' * 8 + # build the partition with this node + pt.setID(pack('!Q', 1)) + pt.make(node_list) + + def connectionLost(self, conn, new_state): + node = self.app.nm.getByUUID(conn.getUUID()) + assert node is not None + if node.getState() == new_state: + return + node.setState(new_state) + + def connectionCompleted(self, conn): + # XXX: handler split review needed to remove this hack + if not self.app._startup_allowed: + # ask the last IDs to perform the recovery + conn.ask(Packets.AskLastIDs()) + + def answerLastIDs(self, conn, loid, ltid, lptid): + app = self.app + pt = app.pt + + # Get max values. + app.loid = max(loid, app.loid) + self.app.tm.setLastTID(ltid) + if lptid > pt.getID(): + # something newer + self.target_uuid = conn.getUUID() + app.pt.setID(lptid) + conn.ask(Packets.AskPartitionTable([])) + + def answerPartitionTable(self, conn, ptid, row_list): + uuid = conn.getUUID() + app = self.app + if uuid != self.target_uuid: + # If this is not from a target node, ignore it. + logging.warn('got answer partition table from %s while waiting ' \ + 'for %s', dump(uuid), dump(self.target_uuid)) + return + # load unknown storage nodes + for offset, row in row_list: + for uuid, state in row: + node = app.nm.getByUUID(uuid) + if node is None: + app.nm.createStorage(uuid=uuid) + # load partition in memory + try: + self.app.pt.load(ptid, row_list, self.app.nm) + except IndexError: + raise ProtocolError('Invalid offset') + -- 2.30.9