# # Copyright (C) 2006-2009 Nexedi SA # # This program is free software; you can redistribute it and/or # modify it under the terms of the GNU General Public License # as published by the Free Software Foundation; either version 2 # of the License, or (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. from neo import logging import os, sys from time import time, gmtime from struct import pack, unpack from neo.config import ConfigurationManager from neo import protocol from neo.protocol import RUNNING_STATE, TEMPORARILY_DOWN_STATE, DOWN_STATE, \ UUID_NAMESPACES, BOOTING, INVALID_UUID from neo.node import NodeManager, MasterNode, StorageNode, ClientNode, AdminNode from neo.event import EventManager from neo.connection import ListeningConnection, ClientConnection, ServerConnection from neo.exception import ElectionFailure, PrimaryFailure, VerificationFailure, \ OperationFailure from neo.master.handlers import election, identification, secondary, recovery from neo.master.handlers import verification, storage, client, shutdown from neo.master.handlers import administration from neo.master.pt import PartitionTable from neo.util import dump from neo.connector import getConnectorHandler REQUIRED_NODE_NUMBER = 1 class Application(object): """The master node application.""" def __init__(self, filename, section, uuid=None): config = ConfigurationManager(filename, section) self.connector_handler = getConnectorHandler(config.getConnector()) self.name = config.getName() if len(self.name) == 0: raise RuntimeError, 'cluster name must be non-empty' self.server = config.getServer() logging.debug('IP address is %s, port is %d', *(self.server)) # Exclude itself from the list. self.master_node_list = [n for n in config.getMasterNodeList() if n != self.server] logging.debug('master nodes are %s', self.master_node_list) # Internal attributes. self.em = EventManager() self.nm = NodeManager() # Partition table replicas, partitions = config.getReplicas(), config.getPartitions() if replicas < 0: raise RuntimeError, 'replicas must be a positive integer' if partitions <= 0: raise RuntimeError, 'partitions must be more than zero' self.pt = PartitionTable(partitions, replicas) logging.debug('the number of replicas is %d, the number of partitions is %d, the name is %s', replicas, partitions, self.name) self.listening_conn = None self.primary = None self.primary_master_node = None self.cluster_state = None # Generate an UUID for self if uuid is None: uuid = self.getNewUUID(protocol.MASTER_NODE_TYPE) self.uuid = uuid # The last OID. self.loid = None # The last TID. self.ltid = None # The target node's uuid to request next. self.target_uuid = None # election related data self.unconnected_master_node_set = set() self.negotiating_master_node_set = set() # verification related data self.unfinished_oid_set = set() self.unfinished_tid_set = set() self.asking_uuid_dict = {} self.object_present = False # service related data self.finishing_transaction_dict = {} def run(self): """Make sure that the status is sane and start a loop.""" for server in self.master_node_list: self.nm.add(MasterNode(server = server)) # Make a listening port. self.listening_conn = ListeningConnection(self.em, None, addr = self.server, connector_handler = self.connector_handler) self.cluster_state = BOOTING # Start the election of a primary master node. self.electPrimary() # Start a normal operation. while 1: try: if self.primary: self.playPrimaryRole() else: self.playSecondaryRole() raise RuntimeError, 'should not reach here' except (ElectionFailure, PrimaryFailure): # Forget all connections. for conn in self.em.getConnectionList(): if not conn.isListening(): conn.close() # Reelect a new primary master. self.electPrimary(bootstrap = False) def electPrimary(self, bootstrap = True): """Elect a primary master node. The difficulty is that a master node must accept connections from others while attempting to connect to other master nodes at the same time. Note that storage nodes and client nodes may connect to self as well as master nodes.""" logging.info('begin the election of a primary master') self.unconnected_master_node_set = set() self.negotiating_master_node_set = set() self.listening_conn.setHandler(election.ServerElectionHandler(self)) client_handler = election.ClientElectionHandler(self) em = self.em nm = self.nm while 1: t = 0 self.primary = None self.primary_master_node = None for node in nm.getMasterNodeList(): self.unconnected_master_node_set.add(node.getServer()) # For now, believe that every node should be available, # since down or broken nodes may be already repaired. node.setState(RUNNING_STATE) self.negotiating_master_node_set.clear() try: while 1: current_time = time() if current_time >= t + 1: t = current_time # Expire temporarily down nodes. For now, assume that a node # which is down for 60 seconds is really down, if this is a # bootstrap. 60 seconds may sound too long, but this is reasonable # when rebooting many cluster machines. Otherwise, wait for only # 10 seconds, because stopping the whole cluster for a long time # is a bad idea. if bootstrap: expiration = 60 else: expiration = 10 for node in nm.getMasterNodeList(): if node.getState() == TEMPORARILY_DOWN_STATE \ and node.getLastStateChange() + expiration < current_time: logging.info('%s is down' % (node, )) node.setState(DOWN_STATE) self.unconnected_master_node_set.discard(node.getServer()) # Try to connect to master nodes. if self.unconnected_master_node_set: for addr in list(self.unconnected_master_node_set): ClientConnection(em, client_handler, addr = addr, connector_handler = self.connector_handler) em.poll(1) if (len(self.unconnected_master_node_set) == 0 \ and len(self.negotiating_master_node_set) == 0) \ or self.primary is not None: break # Now there are three situations: # - I am the primary master # - I am secondary but don't know who is primary # - I am secondary and know who is primary if self.primary is None: # I am the primary. self.primary = True logging.debug('I am the primary, so sending an announcement') for conn in em.getConnectionList(): if conn.isClient(): conn.notify(protocol.announcePrimaryMaster()) conn.abort() closed = False t = time() while not closed: em.poll(1) closed = True for conn in em.getConnectionList(): if conn.isClient(): closed = False break if t + 10 < time(): for conn in em.getConnectionList(): if conn.isClient(): conn.close() closed = True else: # Wait for an announcement. If this is too long, probably # the primary master is down. t = time() while self.primary_master_node is None: em.poll(1) if t + 10 < time(): raise ElectionFailure, 'no primary master elected' # Now I need only a connection to the primary master node. primary = self.primary_master_node addr = primary.getServer() for conn in em.getConnectionList(): if conn.isServer() or conn.isClient() \ and addr != conn.getAddress(): conn.close() # But if there is no such connection, something wrong happened. for conn in em.getConnectionList(): if conn.isClient() and addr == conn.getAddress(): break else: raise ElectionFailure, 'no connection remains to the primary' return except ElectionFailure, m: logging.error('election failed; %s' % m) # Ask all connected nodes to reelect a single primary master. for conn in em.getConnectionList(): if conn.isClient(): conn.notify(protocol.reelectPrimaryMaster()) conn.abort() # Wait until the connections are closed. self.primary = None self.primary_master_node = None closed = False t = time() while not closed: try: em.poll(1) except ElectionFailure: pass closed = True for conn in em.getConnectionList(): if conn.isClient(): # Still not closed. closed = False break if time() > t + 10: # If too long, do not wait. break # Close all connections. for conn in em.getConnectionList(): if not conn.isListening(): conn.close() bootstrap = False # XXX: should accept a node list and send at most one packet per peer def broadcastNodeInformation(self, node): """Broadcast a Notify Node Information packet.""" logging.debug('broadcasting node information') node_type = node.getType() state = node.getState() uuid = node.getUUID() # The server address may be None. address = node.getServer() if node.isClient(): # Only to master nodes and storage nodes. for c in self.em.getConnectionList(): if c.getUUID() is not None: n = self.nm.getNodeByUUID(c.getUUID()) if n.isMaster() or n.isStorage() or n.isAdmin(): node_list = [(node_type, address, uuid, state)] c.notify(protocol.notifyNodeInformation(node_list)) elif node.isMaster() or node.isStorage(): for c in self.em.getConnectionList(): if c.getUUID() is not None: node_list = [(node_type, address, uuid, state)] c.notify(protocol.notifyNodeInformation(node_list)) elif not node.isAdmin(): raise RuntimeError('unknown node type') def broadcastPartitionChanges(self, ptid, cell_list): """Broadcast a Notify Partition Changes packet.""" logging.debug('broadcastPartitionChanges') self.pt.log() for c in self.em.getConnectionList(): n = self.nm.getNodeByUUID(c.getUUID()) if n is None: continue if n.isClient() or n.isStorage() or n.isAdmin(): # Split the packet if too big. size = len(cell_list) start = 0 while size: amt = min(10000, size) cell_list = cell_list[start:start+amt] p = protocol.notifyPartitionChanges(ptid, cell_list) c.notify(p) size -= amt start += amt def outdateAndBroadcastPartition(self): " Outdate cell of non-working nodes and broadcast changes """ cell_list = self.pt.outdate() if cell_list: self.broadcastPartitionChanges(self.pt.setNextID(), cell_list) def sendPartitionTable(self, conn): """ Send the partition table through the given connection """ row_list = [] for offset in xrange(self.pt.getPartitions()): row_list.append((offset, self.pt.getRow(offset))) # Split the packet if too huge. if len(row_list) == 1000: conn.notify(protocol.sendPartitionTable( self.pt.getID(), row_list)) del row_list[:] if row_list: conn.notify(protocol.sendPartitionTable(self.pt.getID(), row_list)) def sendNodesInformations(self, conn): """ Send informations on all nodes through the given connection """ node_list = [] for n in self.nm.getNodeList(): if not n.isAdmin(): try: address = n.getServer() except TypeError: address = None node_list.append((n.getType(), address, n.getUUID(), n.getState())) # Split the packet if too huge. if len(node_list) == 10000: conn.notify(protocol.notifyNodeInformation(node_list)) del node_list[:] if node_list: conn.notify(protocol.notifyNodeInformation(node_list)) def broadcastLastOID(self, oid): logging.debug('Broadcast last OID to storages : %s' % dump(oid)) packet = protocol.notifyLastOID(oid) for conn in self.em.getConnectionList(): node = self.nm.getNodeByUUID(conn.getUUID()) if node is not None and node.isStorage(): conn.notify(packet) def buildFromScratch(self): nm, em, pt = self.nm, self.em, self.pt logging.debug('creating a new partition table, wait for a storage node') # wait for some empty storage nodes, their are accepted while len(nm.getStorageNodeList()) < REQUIRED_NODE_NUMBER: em.poll(1) # take the first node available node_list = nm.getStorageNodeList()[:REQUIRED_NODE_NUMBER] for node in node_list: node.setState(protocol.RUNNING_STATE) self.broadcastNodeInformation(node) # resert IDs generators self.loid = '\0' * 8 self.ltid = '\0' * 8 # build the partition with this node pt.setID(pack('!Q', 1)) pt.make(node_list) def recoverStatus(self): """Recover the status about the cluster. Obtain the last OID, the last TID, and the last Partition Table ID from storage nodes, then get back the latest partition table or make a new table from scratch, if this is the first time.""" logging.info('begin the recovery of the status') self.changeClusterState(protocol.RECOVERING) em = self.em self.loid = None self.ltid = None self.pt.setID(None) self.target_uuid = None # collect the last partition table available while self.cluster_state == protocol.RECOVERING: em.poll(1) logging.info('startup allowed') # build a new partition table if self.pt.getID() is None: self.buildFromScratch() # collect node that are connected but not in the selected partition # table and set them in pending state allowed_node_set = set(self.pt.getNodeList()) refused_node_set = set(self.nm.getStorageNodeList()) - allowed_node_set for node in refused_node_set: node.setState(protocol.PENDING_STATE) self.broadcastNodeInformation(node) logging.debug('cluster starts with this partition table :') self.pt.log() def verifyTransaction(self, tid): em = self.em uuid_set = set() # Determine to which nodes I should ask. partition = self.pt.getPartition(tid) transaction_uuid_list = [cell.getUUID() for cell \ in self.pt.getCellList(partition, readable=True)] if len(transaction_uuid_list) == 0: raise VerificationFailure uuid_set.update(transaction_uuid_list) # Gather OIDs. self.asking_uuid_dict = {} self.unfinished_oid_set = set() for conn in em.getConnectionList(): uuid = conn.getUUID() if uuid in transaction_uuid_list: self.asking_uuid_dict[uuid] = False conn.ask(protocol.askTransactionInformation(tid)) if len(self.asking_uuid_dict) == 0: raise VerificationFailure while 1: em.poll(1) if not self.pt.operational(): raise VerificationFailure if False not in self.asking_uuid_dict.values(): break if self.unfinished_oid_set is None or len(self.unfinished_oid_set) == 0: # Not commitable. return None else: # Verify that all objects are present. for oid in self.unfinished_oid_set: self.asking_uuid_dict.clear() partition = self.pt.getPartition(oid) object_uuid_list = [cell.getUUID() for cell \ in self.pt.getCellList(partition, readable=True)] if len(object_uuid_list) == 0: raise VerificationFailure uuid_set.update(object_uuid_list) self.object_present = True for conn in em.getConnectionList(): uuid = conn.getUUID() if uuid in object_uuid_list: self.asking_uuid_dict[uuid] = False conn.ask(protocol.askObjectPresent(oid, tid)) while 1: em.poll(1) if not self.pt.operational(): raise VerificationFailure if False not in self.asking_uuid_dict.values(): break if not self.object_present: # Not commitable. return None return uuid_set def verifyData(self): """Verify the data in storage nodes and clean them up, if necessary.""" em, nm = self.em, self.nm self.changeClusterState(protocol.VERIFYING) # wait for any missing node logging.debug('waiting for the cluster to be operational') while not self.pt.operational(): em.poll(1) logging.info('start to verify data') # Gather all unfinished transactions. # # FIXME this part requires more brainstorming. Currently, this deals with # only unfinished transactions. But how about finished transactions? # Suppose that A and B have an unfinished transaction. First, A and B are # asked to commit the transaction. Then, A succeeds. B gets down. Now, # A believes that the transaction has been committed, while B still believes # that the transaction is unfinished. Next, if B goes back and A is working, # no problem; because B's unfinished transaction will be committed correctly. # However, when B goes back, if A is down, what happens? If the state is # not very good, B may be asked to abort the transaction! # # This situation won't happen frequently, and B shouldn't be asked to drop # the transaction, if the cluster is not ready. However, there might be # some corner cases where this may happen. That's why more brainstorming # is required. self.asking_uuid_dict = {} self.unfinished_tid_set = set() for conn in em.getConnectionList(): uuid = conn.getUUID() if uuid is not None: node = nm.getNodeByUUID(uuid) if node.isStorage(): self.asking_uuid_dict[uuid] = False conn.ask(protocol.askUnfinishedTransactions()) while 1: em.poll(1) if not self.pt.operational(): raise VerificationFailure if False not in self.asking_uuid_dict.values(): break # Gather OIDs for each unfinished TID, and verify whether the transaction # can be finished or must be aborted. This could be in parallel in theory, # but not so easy. Thus do it one-by-one at the moment. for tid in self.unfinished_tid_set: uuid_set = self.verifyTransaction(tid) if uuid_set is None: # Make sure that no node has this transaction. for conn in em.getConnectionList(): uuid = conn.getUUID() if uuid is not None: node = nm.getNodeByUUID(uuid) if node.isStorage(): conn.notify(protocol.deleteTransaction(tid)) else: for conn in em.getConnectionList(): uuid = conn.getUUID() if uuid in uuid_set: conn.ask(protocol.commitTransaction(tid)) # If possible, send the packets now. em.poll(0) # At this stage, all non-working nodes are out-of-date. cell_list = self.pt.outdate() # Tweak the partition table, if the distribution of storage nodes # is not uniform. cell_list.extend(self.pt.tweak()) # If anything changed, send the changes. if cell_list: self.broadcastPartitionChanges(self.pt.setNextID(), cell_list) def provideService(self): """This is the normal mode for a primary master node. Handle transactions and stop the service only if a catastrophy happens or the user commits a shutdown.""" logging.info('provide service') em = self.em nm = self.nm self.changeClusterState(protocol.RUNNING) # This dictionary is used to hold information on transactions being finished. self.finishing_transaction_dict = {} # Now everything is passive. while True: try: em.poll(1) except OperationFailure: # If not operational, send Stop Operation packets to storage nodes # and client nodes. Abort connections to client nodes. logging.critical('No longer operational, so stopping the service') for conn in em.getConnectionList(): node = nm.getNodeByUUID(conn.getUUID()) if node is not None and (node.isStorage() or node.isClient()): conn.notify(protocol.stopOperation()) if node.isClient(): conn.abort() # Then, go back, and restart. return def playPrimaryRole(self): logging.info('play the primary role with %s (%s:%d)', dump(self.uuid), *(self.server)) # all incoming connections identify through this handler self.listening_conn.setHandler(identification.IdentificationHandler(self)) handler = secondary.SecondaryMasterHandler(self) em = self.em nm = self.nm # Make sure that every connection has the secondary event handler. for conn in em.getConnectionList(): conn_uuid = conn.getUUID() if conn_uuid is not None: node = nm.getNodeByUUID(conn_uuid) assert node is not None assert node.getType() == protocol.MASTER_NODE_TYPE conn.setHandler(handler) # If I know any storage node, make sure that they are not in the running state, # because they are not connected at this stage. for node in nm.getStorageNodeList(): if node.getState() == RUNNING_STATE: node.setState(TEMPORARILY_DOWN_STATE) # recover the cluster status at startup self.recoverStatus() while 1: try: self.verifyData() except VerificationFailure: continue self.provideService() def playSecondaryRole(self): """I play a secondary role, thus only wait for a primary master to fail.""" logging.info('play the secondary role with %s (%s:%d)', dump(self.uuid), *(self.server)) primary_master_handler = secondary.PrimaryMasterHandler(self) handler = identification.IdentificationHandler(self) em = self.em # Make sure that every connection has the secondary event handler. connection_list = em.getConnectionList() primary_master_found = False for conn in em.getConnectionList(): if (not conn.isListening()) and conn.isClient(): assert not primary_master_found primary_master_found = True conn.setHandler(primary_master_handler) else: conn.setHandler(handler) while 1: em.poll(1) def changeClusterState(self, state): """ Change the cluster state and apply right handler on each connections """ if self.cluster_state == state: return nm, em = self.nm, self.em # select the storage handler if state == protocol.BOOTING: storage_handler = recovery.RecoveryHandler elif state == protocol.RECOVERING: storage_handler = recovery.RecoveryHandler elif state == protocol.VERIFYING: storage_handler = verification.VerificationHandler elif state == protocol.RUNNING: storage_handler = storage.StorageServiceHandler else: RuntimeError('Unexpected node type') # change handlers notification_packet = protocol.notifyClusterInformation(state) for conn in em.getConnectionList(): node = nm.getNodeByUUID(conn.getUUID()) if conn.isListening() or node is None: # not identified or listening, keep the identification handler continue conn.notify(notification_packet) if node.isAdmin() or node.isMaster(): # those node types keep their own handler continue if node.isClient(): if state != protocol.RUNNING: conn.close() handler = client.ClientServiceHandler elif node.isStorage(): handler = storage_handler handler = handler(self) conn.setHandler(handler) handler.connectionCompleted(conn) self.cluster_state = state def getNextTID(self): tm = time() gmt = gmtime(tm) upper = ((((gmt.tm_year - 1900) * 12 + gmt.tm_mon - 1) * 31 \ + gmt.tm_mday - 1) * 24 + gmt.tm_hour) * 60 + gmt.tm_min lower = int((gmt.tm_sec % 60 + (tm - int(tm))) / (60.0 / 65536.0 / 65536.0)) tid = pack('!LL', upper, lower) if tid <= self.ltid: upper, lower = unpack('!LL', self.ltid) if lower == 0xffffffff: # This should not happen usually. from datetime import timedelta, datetime d = datetime(gmt.tm_year, gmt.tm_mon, gmt.tm_mday, gmt.tm_hour, gmt.tm_min) \ + timedelta(0, 60) upper = ((((d.year - 1900) * 12 + d.month - 1) * 31 \ + d.day - 1) * 24 + d.hour) * 60 + d.minute lower = 0 else: lower += 1 tid = pack('!LL', upper, lower) self.ltid = tid return tid def getNewOIDList(self, num_oids): if self.loid is None: raise RuntimeError, 'I do not know the last OID' oid = unpack('!Q', self.loid)[0] + 1 oid_list = [pack('!Q', oid + i) for i in xrange(num_oids)] self.loid = oid_list[-1] self.broadcastLastOID(self.loid) return oid_list def getNewUUID(self, node_type): # build an UUID uuid = os.urandom(15) while uuid == protocol.INVALID_UUID[1:]: uuid = os.urandom(15) # look for the prefix prefix = UUID_NAMESPACES.get(node_type, None) if prefix is None: raise RuntimeError, 'No UUID namespace found for this node type' return prefix + uuid def isValidUUID(self, uuid, addr): node = self.nm.getNodeByUUID(uuid) if node is not None and node.getServer() is not None and node.getServer() != addr: return False return uuid != self.uuid and uuid is not None def getClusterState(self): return self.cluster_state def shutdown(self): """Close all connections and exit""" # change handler handler = shutdown.ShutdownHandler(self) for c in self.em.getConnectionList(): c.setHandler(handler) # wait for all transaction to be finished while 1: self.em.poll(1) if len(self.finishing_transaction_dict) == 0: if self.cluster_state == protocol.RUNNING: sys.exit("Application has been asked to shut down") else: # no more transaction, ask clients to shutdown logging.info("asking all clients to shutdown") for c in self.em.getConnectionList(): node = self.nm.getNodeByUUID(c.getUUID()) if node.isClient(): node_list = [(node.getType(), node.getServer(), node.getUUID(), DOWN_STATE)] c.notify(protocol.notifyNodeInformation(node_list)) # then ask storages and master nodes to shutdown logging.info("asking all remaining nodes to shutdown") for c in self.em.getConnectionList(): node = self.nm.getNodeByUUID(c.getUUID()) if node.isStorage() or node.isMaster(): node_list = [(node.getType(), node.getServer(), node.getUUID(), DOWN_STATE)] c.notify(protocol.notifyNodeInformation(node_list)) # then shutdown sys.exit("Cluster has been asked to shut down") def identifyStorageNode(self, uuid, node): state = protocol.RUNNING_STATE handler = None if self.cluster_state == protocol.RECOVERING: if uuid is None: logging.info('reject empty storage node') raise protocol.NotReadyError handler = recovery.RecoveryHandler elif self.cluster_state == protocol.VERIFYING: if uuid is None or node is None: # if node is unknown, it has been forget when the current # partition was validated by the admin uuid = None state = protocol.PENDING_STATE handler = verification.VerificationHandler elif self.cluster_state == protocol.RUNNING: if uuid is None or node is None: # same as for verification uuid = None state = protocol.PENDING_STATE handler = storage.StorageServiceHandler elif self.cluster_state == protocol.STOPPING: raise protocol.NotReadyError else: raise RuntimeError('unhandled cluster state') return (uuid, state, handler) def identifyNode(self, node_type, uuid, node): state = protocol.RUNNING_STATE handler = identification.IdentificationHandler if node_type == protocol.ADMIN_NODE_TYPE: # always accept admin nodes klass = AdminNode handler = administration.AdministrationHandler logging.info('Accept an admin %s' % dump(uuid)) elif node_type == protocol.MASTER_NODE_TYPE: if node is None: # unknown master, rejected raise protocol.ProtocolError('Reject an unknown master node') # always put other master in waiting state klass = MasterNode handler = secondary.SecondaryMasterHandler logging.info('Accept a master %s' % dump(uuid)) elif node_type == protocol.CLIENT_NODE_TYPE: # refuse any client before running if self.cluster_state != protocol.RUNNING: logging.info('Reject a connection from a client') raise protocol.NotReadyError klass = ClientNode handler = client.ClientServiceHandler logging.info('Accept a client %s' % dump(uuid)) elif node_type == protocol.STORAGE_NODE_TYPE: klass = StorageNode (uuid, state, handler) = self.identifyStorageNode(uuid, node) logging.info('Accept a storage (%s)' % state) return (uuid, node, state, handler, klass)