From 6be00df78d8e29100fe0996afbf09c86868b4b56 Mon Sep 17 00:00:00 2001
From: Julien Muchembled <jm@nexedi.com>
Date: Thu, 16 Aug 2012 15:46:55 +0200
Subject: [PATCH] neoctl: change 'set node' command into 'kill', which kills a
 node safely

---
 TODO                                  |  3 --
 neo/admin/handler.py                  | 16 +------
 neo/lib/protocol.py                   |  3 +-
 neo/master/app.py                     |  2 +-
 neo/master/handlers/administration.py | 60 +++++++++++++--------------
 neo/master/handlers/secondary.py      |  6 ++-
 neo/neoctl/app.py                     | 41 +++++-------------
 neo/neoctl/neoctl.py                  | 20 ++++-----
 neo/storage/handlers/__init__.py      |  2 +-
 neo/tests/functional/__init__.py      |  6 +++
 neo/tests/functional/testMaster.py    | 14 ++++---
 neo/tests/functional/testStorage.py   |  4 +-
 12 files changed, 71 insertions(+), 106 deletions(-)

diff --git a/TODO b/TODO
index 75222503..dfca21ef 100644
--- a/TODO
+++ b/TODO
@@ -69,9 +69,6 @@ RC  - Review output of pylint (CODE)
       should be made a singleton (saves the CPU time needed to instanciates all
       the copies - often when a connection is established, saves the memory
       used by each copy).
-    - Consider replace setNodeState admin packet by one per action, like
-      dropNode to reduce packet processing complexity and reduce bad actions
-      like set a node in TEMPORARILY_DOWN state.
     - Review node notfications. Eg. A storage don't have to be notified of new
       clients but only when one is lost.
     - Review transactional isolation of various methods
diff --git a/neo/admin/handler.py b/neo/admin/handler.py
index 350058d0..8a65ee01 100644
--- a/neo/admin/handler.py
+++ b/neo/admin/handler.py
@@ -53,21 +53,6 @@ class AdminEventHandler(EventHandler):
         p = Packets.AnswerNodeList(node_information_list)
         conn.answer(p)
 
-    @check_primary_master
-    def setNodeState(self, conn, uuid, state, modify_partition_table):
-        logging.info("set node state for %s-%s", uuid_str(uuid), state)
-        node = self.app.nm.getByUUID(uuid)
-        if node is None:
-            raise protocol.ProtocolError('invalid uuid')
-        if node.getState() == state and modify_partition_table is False:
-            # no change
-            p = Errors.Ack('no change')
-            conn.answer(p)
-            return
-        # forward to primary master node
-        p = Packets.SetNodeState(uuid, state, modify_partition_table)
-        self.app.master_conn.ask(p, conn=conn, msg_id=conn.getPeerId())
-
     @check_primary_master
     def askClusterState(self, conn):
         conn.answer(Packets.AnswerClusterState(self.app.cluster_state))
@@ -80,6 +65,7 @@ class AdminEventHandler(EventHandler):
     addPendingNodes = forward_ask(Packets.AddPendingNodes)
     tweakPartitionTable = forward_ask(Packets.TweakPartitionTable)
     setClusterState = forward_ask(Packets.SetClusterState)
+    setNodeState = forward_ask(Packets.SetNodeState)
     checkReplicas = forward_ask(Packets.CheckReplicas)
 
 
diff --git a/neo/lib/protocol.py b/neo/lib/protocol.py
index 9582ef1b..0aee0cd1 100644
--- a/neo/lib/protocol.py
+++ b/neo/lib/protocol.py
@@ -26,7 +26,7 @@ except ImportError:
     pass
 
 # The protocol version (major, minor).
-PROTOCOL_VERSION = (12, 1)
+PROTOCOL_VERSION = (13, 1)
 
 # Size restrictions.
 MIN_PACKET_SIZE = 10
@@ -1135,7 +1135,6 @@ class SetNodeState(Packet):
     _fmt = PStruct('set_node_state',
         PUUID('uuid'),
         PFNodeState,
-        PBoolean('modify_partition_table'),
     )
 
     _answer = Error
diff --git a/neo/master/app.py b/neo/master/app.py
index 84b7419a..3c7a85cb 100644
--- a/neo/master/app.py
+++ b/neo/master/app.py
@@ -247,7 +247,7 @@ class Application(object):
 
         # send at most one non-empty notification packet per node
         for node in self.nm.getIdentifiedList():
-            node_list = node_dict.get(node.getType(), [])
+            node_list = node_dict.get(node.getType())
             if node_list and node.isRunning():
                 node.notify(Packets.NotifyNodeInformation(node_list))
 
diff --git a/neo/master/handlers/administration.py b/neo/master/handlers/administration.py
index 00fb83bf..47566fdc 100644
--- a/neo/master/handlers/administration.py
+++ b/neo/master/handlers/administration.py
@@ -20,8 +20,8 @@ from . import MasterHandler
 from ..app import StateChangedException
 from neo.lib import logging
 from neo.lib.pt import PartitionTableException
-from neo.lib.protocol import ClusterStates, NodeStates, Packets, ProtocolError
-from neo.lib.protocol import Errors, uuid_str
+from neo.lib.protocol import ClusterStates, Errors, \
+    NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
 from neo.lib.util import dump
 
 CLUSTER_STATE_WORKFLOW = {
@@ -32,6 +32,10 @@ CLUSTER_STATE_WORKFLOW = {
     ClusterStates.STOPPING_BACKUP: (ClusterStates.BACKINGUP,
                                     ClusterStates.STARTING_BACKUP),
 }
+NODE_STATE_WORKFLOW = {
+    NodeTypes.MASTER: (NodeStates.UNKNOWN,),
+    NodeTypes.STORAGE: (NodeStates.UNKNOWN, NodeStates.DOWN),
+}
 
 class AdministrationHandler(MasterHandler):
     """This class deals with messages from the admin node only"""
@@ -72,38 +76,24 @@ class AdministrationHandler(MasterHandler):
         if state != app.cluster_state:
             raise StateChangedException(state)
 
-    def setNodeState(self, conn, uuid, state, modify_partition_table):
-        logging.info("set node state for %s-%s : %s",
-                     uuid_str(uuid), state, modify_partition_table)
+    def setNodeState(self, conn, uuid, state):
+        logging.info("set node state for %s: %s", uuid_str(uuid), state)
         app = self.app
         node = app.nm.getByUUID(uuid)
         if node is None:
             raise ProtocolError('unknown node')
-
+        if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
+            raise ProtocolError('can not switch node to this state')
         if uuid == app.uuid:
-            node.setState(state)
-            # get message for self
-            if state != NodeStates.RUNNING:
-                p = Errors.Ack('node state changed')
-                conn.answer(p)
-                app.shutdown()
+            raise ProtocolError('can not kill primary master node')
 
-        if node.getState() == state:
-            # no change, just notify admin node
-            p = Errors.Ack('node already in %s state' % state)
-            conn.answer(p)
-            return
-
-        if state == NodeStates.RUNNING:
-            # first make sure to have a connection to the node
-            if not node.isConnected():
-                raise ProtocolError('no connection to the node')
-            node.setState(state)
-
-        elif state == NodeStates.DOWN and node.isStorage():
+        state_changed = state != node.getState()
+        message = ('state changed' if state_changed else
+                   'node already in %s state' % state)
+        if node.isStorage():
+            keep = state == NodeStates.UNKNOWN
             try:
-                cell_list = app.pt.dropNodeList([node],
-                    not modify_partition_table)
+                cell_list = app.pt.dropNodeList([node], keep)
             except PartitionTableException, e:
                 raise ProtocolError(str(e))
             node.setState(state)
@@ -112,17 +102,23 @@ class AdministrationHandler(MasterHandler):
                 node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
                 # close to avoid handle the closure as a connection lost
                 node.getConnection().abort()
-            if not modify_partition_table:
+            if keep:
                 cell_list = app.pt.outdate()
+            elif cell_list:
+                message = 'node permanently removed'
             app.broadcastPartitionChanges(cell_list)
-
         else:
             node.setState(state)
 
         # /!\ send the node information *after* the partition table change
-        p = Errors.Ack('state changed')
-        conn.answer(p)
-        app.broadcastNodesInformation([node])
+        conn.answer(Errors.Ack(message))
+        if state_changed:
+            # notify node explicitly because broadcastNodesInformation()
+            # ignores non-running nodes
+            assert not node.isRunning()
+            if node.isConnected():
+                node.notify(Packets.NotifyNodeInformation([node.asTuple()]))
+            app.broadcastNodesInformation([node])
 
     def addPendingNodes(self, conn, uuid_list):
         uuids = ', '.join(map(uuid_str, uuid_list))
diff --git a/neo/master/handlers/secondary.py b/neo/master/handlers/secondary.py
index 63de1e61..423907d8 100644
--- a/neo/master/handlers/secondary.py
+++ b/neo/master/handlers/secondary.py
@@ -14,10 +14,11 @@
 # You should have received a copy of the GNU General Public License
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
+import sys
 from . import MasterHandler
 from neo.lib.handler import EventHandler
 from neo.lib.exception import ElectionFailure, PrimaryFailure
-from neo.lib.protocol import NodeTypes, Packets, uuid_str
+from neo.lib.protocol import NodeStates, NodeTypes, Packets, uuid_str
 from neo.lib import logging
 
 class SecondaryMasterHandler(MasterHandler):
@@ -72,7 +73,8 @@ class PrimaryHandler(EventHandler):
             if node_type != NodeTypes.MASTER:
                 # No interest.
                 continue
-
+            if uuid == app.uuid and state == NodeStates.UNKNOWN:
+                sys.exit()
             # Register new master nodes.
             if app.server == addr:
                 # This is self.
diff --git a/neo/neoctl/app.py b/neo/neoctl/app.py
index fa4bbbd0..cdec9d80 100644
--- a/neo/neoctl/app.py
+++ b/neo/neoctl/app.py
@@ -17,7 +17,7 @@
 from operator import itemgetter
 from .neoctl import NeoCTL, NotReadyException
 from neo.lib.util import bin, p64
-from neo.lib.protocol import uuid_str, ClusterStates, NodeStates, NodeTypes, \
+from neo.lib.protocol import uuid_str, ClusterStates, NodeTypes, \
     UUID_NAMESPACES, ZERO_TID
 
 action_dict = {
@@ -28,7 +28,6 @@ action_dict = {
         'primary': 'getPrimary',
     },
     'set': {
-        'node': 'setNodeState',
         'cluster': 'setClusterState',
     },
     'check': 'checkReplicas',
@@ -36,6 +35,7 @@ action_dict = {
     'add': 'enableStorageList',
     'tweak': 'tweakPartitionTable',
     'drop': 'dropNode',
+    'kill': 'killNode',
 }
 
 uuid_int = (lambda ns: lambda uuid:
@@ -50,9 +50,6 @@ class TerminalNeoCTL(object):
         self.neoctl.close()
 
     # Utility methods (could be functions)
-    def asNodeState(self, value):
-        return getattr(NodeStates, value.upper())
-
     def asNodeType(self, value):
         return getattr(NodeTypes, value.upper())
 
@@ -121,25 +118,6 @@ class TerminalNeoCTL(object):
         assert len(params) == 0
         return str(self.neoctl.getClusterState())
 
-    def setNodeState(self, params):
-        """
-          Set node state, and allow (or not) updating partition table.
-          Parameters: node state [update]
-            node: node to modify
-            state: state to put the node in
-            update: disallow (0, default) or allow (other integer) partition
-                    table to be updated
-        """
-        assert len(params) in (2, 3)
-        node = self.asNode(params[0])
-        state = self.asNodeState(params[1])
-        if len(params) == 3:
-            update_partition_table = bool(int(params[2]))
-        else:
-            update_partition_table = False
-        return self.neoctl.setNodeState(node, state,
-            update_partition_table=update_partition_table)
-
     def setClusterState(self, params):
         """
           Set cluster state.
@@ -181,16 +159,19 @@ class TerminalNeoCTL(object):
         """
         return self.neoctl.tweakPartitionTable(map(self.asNode, params))
 
+    def killNode(self, params):
+        """
+          Kill redundant nodes (either a storage or a secondary master).
+          Parameters: node
+        """
+        return self.neoctl.killNode(self.asNode(*params))
+
     def dropNode(self, params):
         """
-          Set node into DOWN state.
+          Remove storage node permanently.
           Parameters: node
-            node: node the pu into DOWN state
-          Equivalent to:
-            set node state (node) DOWN
         """
-        assert len(params) == 1
-        return self.neoctl.dropNode(self.asNode(params[0]))
+        return self.neoctl.dropNode(self.asNode(*params))
 
     def getPrimary(self, params):
         """
diff --git a/neo/neoctl/neoctl.py b/neo/neoctl/neoctl.py
index ecc20660..dc79dc96 100644
--- a/neo/neoctl/neoctl.py
+++ b/neo/neoctl/neoctl.py
@@ -100,16 +100,11 @@ class NeoCTL(object):
             raise RuntimeError(response)
         return response[2]
 
-    def setNodeState(self, node, state, update_partition_table=False):
+    def _setNodeState(self, node, state):
         """
-          Set node state, and allow (or not) updating partition table.
+          Kill node, or remove it permanently
         """
-        if update_partition_table:
-            update_partition_table = 1
-        else:
-            update_partition_table = 0
-        packet = Packets.SetNodeState(node, state, update_partition_table)
-        response = self.__ask(packet)
+        response = self.__ask(Packets.SetNodeState(node, state))
         if response[0] != Packets.Error or response[1] != ErrorCodes.ACK:
             raise RuntimeError(response)
         return response[2]
@@ -151,12 +146,11 @@ class NeoCTL(object):
         """
         return self.setClusterState(ClusterStates.VERIFYING)
 
+    def killNode(self, node):
+        return self._setNodeState(node, NodeStates.UNKNOWN)
+
     def dropNode(self, node):
-        """
-          Set node into "down" state and remove it from partition table.
-        """
-        return self.setNodeState(node, NodeStates.DOWN,
-                update_partition_table=1)
+        return self._setNodeState(node, NodeStates.DOWN)
 
     def getPrimary(self):
         """
diff --git a/neo/storage/handlers/__init__.py b/neo/storage/handlers/__init__.py
index a0ca3200..3036799c 100644
--- a/neo/storage/handlers/__init__.py
+++ b/neo/storage/handlers/__init__.py
@@ -44,7 +44,7 @@ class BaseMasterHandler(EventHandler):
                 # This is me, do what the master tell me
                 logging.info("I was told I'm %s", state)
                 if state in (NodeStates.DOWN, NodeStates.TEMPORARILY_DOWN,
-                        NodeStates.BROKEN):
+                        NodeStates.BROKEN, NodeStates.UNKNOWN):
                     erase = state == NodeStates.DOWN
                     self.app.shutdown(erase=erase)
                 elif state == NodeStates.HIDDEN:
diff --git a/neo/tests/functional/__init__.py b/neo/tests/functional/__init__.py
index 5664e850..03423c57 100644
--- a/neo/tests/functional/__init__.py
+++ b/neo/tests/functional/__init__.py
@@ -629,6 +629,12 @@ class NEOCluster(object):
             return current_try, current_try
         self.expectCondition(callback, *args, **kw)
 
+    def expectDead(self, process, *args, **kw):
+        def callback(last_try):
+            current_try = not process.isAlive()
+            return current_try, current_try
+        self.expectCondition(callback, *args, **kw)
+
     def expectStorageNotKnown(self, process, *args, **kw):
         # /!\ Not Known != Unknown
         process_uuid = process.getUUID()
diff --git a/neo/tests/functional/testMaster.py b/neo/tests/functional/testMaster.py
index 471acf8d..0f946310 100644
--- a/neo/tests/functional/testMaster.py
+++ b/neo/tests/functional/testMaster.py
@@ -41,12 +41,14 @@ class MasterTests(NEOFunctionalTest):
         self.neo.expectAllMasters(MASTER_NODE_COUNT)
 
         # Kill
-        killed_uuid_list = self.neo.killSecondaryMaster()
-        # Test sanity check.
-        self.assertEqual(len(killed_uuid_list), 1)
-        uuid = killed_uuid_list[0]
-        # Check node state has changed.
-        self.neo.expectMasterState(uuid, None)
+        primary_uuid = self.neoctl.getPrimary()
+        for master in self.neo.getMasterProcessList():
+            uuid = master.getUUID()
+            if uuid != primary_uuid:
+                break
+        self.neo.neoctl.killNode(uuid)
+        self.neo.expectDead(master)
+        self.assertRaises(RuntimeError, self.neo.neoctl.killNode, primary_uuid)
 
     def testStoppingPrimaryWithTwoSecondaries(self):
         # Wait for masters to stabilize
diff --git a/neo/tests/functional/testStorage.py b/neo/tests/functional/testStorage.py
index 03054169..3fc65366 100644
--- a/neo/tests/functional/testStorage.py
+++ b/neo/tests/functional/testStorage.py
@@ -179,13 +179,15 @@ class StorageTests(NEOFunctionalTest):
         self.neo.expectRunning(started[2])
         self.neo.expectOudatedCells(number=0)
 
-        started[0].stop()
+        self.neo.neoctl.killNode(started[0].getUUID())
         # Cluster still operational. All cells of first storage should be
         # outdated.
         self.neo.expectUnavailable(started[0])
         self.neo.expectOudatedCells(2)
         self.neo.expectClusterRunning()
 
+        self.assertRaises(RuntimeError, self.neo.neoctl.killNode,
+            started[1].getUUID())
         started[1].stop()
         # Cluster not operational anymore. Only cells of second storage that
         # were shared with the third one should become outdated.
-- 
2.30.9