administration.py 9.79 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2006-2017  Nexedi SA
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
import random
18

19
from . import MasterHandler
20
from ..app import monotonic_time, StateChangedException
21
from neo.lib import logging
22
from neo.lib.exception import StoppedOperation
23
from neo.lib.pt import PartitionTableException
24 25
from neo.lib.protocol import ClusterStates, Errors, \
    NodeStates, NodeTypes, Packets, ProtocolError, uuid_str
26
from neo.lib.util import dump
27

28 29
CLUSTER_STATE_WORKFLOW = {
    # destination: sources
30 31 32 33 34
    ClusterStates.VERIFYING: (ClusterStates.RECOVERING,),
    ClusterStates.STARTING_BACKUP: (ClusterStates.RUNNING,
                                    ClusterStates.STOPPING_BACKUP),
    ClusterStates.STOPPING_BACKUP: (ClusterStates.BACKINGUP,
                                    ClusterStates.STARTING_BACKUP),
35
}
36
NODE_STATE_WORKFLOW = {
37 38
    NodeTypes.MASTER: (NodeStates.DOWN,),
    NodeTypes.STORAGE: (NodeStates.DOWN, NodeStates.UNKNOWN),
39
}
40

41
class AdministrationHandler(MasterHandler):
42 43
    """This class deals with messages from the admin node only"""

44
    def connectionLost(self, conn, new_state):
45
        node = self.app.nm.getByUUID(conn.getUUID())
46 47
        if node is not None:
            self.app.nm.remove(node)
48

49
    def setClusterState(self, conn, state):
50
        app = self.app
51
        # check request
52 53 54 55
        try:
            if app.cluster_state not in CLUSTER_STATE_WORKFLOW[state]:
                raise ProtocolError('Can not switch to this state')
        except KeyError:
56 57
            if state != ClusterStates.STOPPING:
                raise ProtocolError('Invalid state requested')
58 59

        # change state
60
        if state == ClusterStates.VERIFYING:
61
            storage_list = app.nm.getStorageList(only_identified=True)
62 63 64 65 66 67
            if not storage_list:
                raise ProtocolError('Cannot exit recovery without any '
                    'storage node')
            for node in storage_list:
                assert node.isPending(), node
                if node.getConnection().isPending():
68 69 70
                    # XXX: It's wrong to use ProtocolError here. We must reply
                    #      less aggressively because the admin has no way to
                    #      know that there's still pending activity.
71 72
                    raise ProtocolError('Cannot exit recovery now: node %r is '
                        'entering cluster' % (node, ))
73 74
            app._startup_allowed = True
            state = app.cluster_state
Kirill Smelkov's avatar
.  
Kirill Smelkov committed
75
        elif state == ClusterStates.STARTING_BACKUP:
76 77 78
            if app.tm.hasPending() or app.nm.getClientList(True):
                raise ProtocolError("Can not switch to %s state with pending"
                    " transactions or connected clients" % state)
79

80
        conn.answer(Errors.Ack('Cluster state changed'))
81 82
        if state != app.cluster_state:
            raise StateChangedException(state)
Aurel's avatar
Aurel committed
83

84 85
    def setNodeState(self, conn, uuid, state):
        logging.info("set node state for %s: %s", uuid_str(uuid), state)
86
        app = self.app
87
        node = app.nm.getByUUID(uuid)
88
        if node is None:
89
            raise ProtocolError('unknown node')
90 91
        if state not in NODE_STATE_WORKFLOW.get(node.getType(), ()):
            raise ProtocolError('can not switch node to this state')
92
        if uuid == app.uuid:
93
            raise ProtocolError('can not kill primary master node')
94

95 96 97 98
        state_changed = state != node.getState()
        message = ('state changed' if state_changed else
                   'node already in %s state' % state)
        if node.isStorage():
99
            keep = state == NodeStates.DOWN
100
            try:
101
                cell_list = app.pt.dropNodeList([node], keep)
102 103
            except PartitionTableException, e:
                raise ProtocolError(str(e))
104
            node.setState(state)
105
            if node.isConnected():
106
                # notify itself so it can shutdown
107
                node.send(Packets.NotifyNodeInformation(
108
                    monotonic_time(), [node.asTuple()]))
109
                # close to avoid handle the closure as a connection lost
110
                node.getConnection().abort()
111
            if keep:
112
                cell_list = app.pt.outdate()
113 114
            elif cell_list:
                message = 'node permanently removed'
115
            app.broadcastPartitionChanges(cell_list)
116 117 118
        else:
            node.setState(state)

119
        # /!\ send the node information *after* the partition table change
120 121 122 123 124 125
        conn.answer(Errors.Ack(message))
        if state_changed:
            # notify node explicitly because broadcastNodesInformation()
            # ignores non-running nodes
            assert not node.isRunning()
            if node.isConnected():
126
                node.send(Packets.NotifyNodeInformation(
127
                    monotonic_time(), [node.asTuple()]))
128
            app.broadcastNodesInformation([node])
129

130
    def addPendingNodes(self, conn, uuid_list):
131
        uuids = ', '.join(map(uuid_str, uuid_list))
132
        logging.debug('Add nodes %s', uuids)
133
        app = self.app
134 135 136 137 138 139 140 141 142 143 144 145 146
        state = app.getClusterState()
        # XXX: Would it be safe to allow more states ?
        if state not in (ClusterStates.RUNNING,
                         ClusterStates.STARTING_BACKUP,
                         ClusterStates.BACKINGUP):
            raise ProtocolError('Can not add nodes in %s state' % state)
        # take all pending nodes
        node_list = list(app.pt.addNodeList(node
            for node in app.nm.getStorageList()
            if node.isPending() and node.getUUID() in uuid_list))
        if node_list:
            for node in node_list:
                node.setRunning()
147
                app.startStorage(node)
148 149 150 151 152 153 154
            app.broadcastNodesInformation(node_list)
            conn.answer(Errors.Ack('Nodes added: %s' %
                ', '.join(uuid_str(x.getUUID()) for x in node_list)))
        else:
            logging.warning('No node added')
            conn.answer(Errors.Ack('No node added'))

155 156 157 158 159 160 161 162 163 164
    def repair(self, conn, uuid_list, *args):
        getByUUID = self.app.nm.getByUUID
        node_list = []
        for uuid in uuid_list:
            node = getByUUID(uuid)
            if node is None or not (node.isStorage() and node.isIdentified()):
                raise ProtocolError("invalid storage node %s" % uuid_str(uuid))
            node_list.append(node)
        repair = Packets.NotifyRepair(*args)
        for node in node_list:
165
            node.send(repair)
166 167
        conn.answer(Errors.Ack(''))

168 169 170 171 172 173 174 175 176 177 178 179
    def tweakPartitionTable(self, conn, uuid_list):
        app = self.app
        state = app.getClusterState()
        # XXX: Would it be safe to allow more states ?
        if state not in (ClusterStates.RUNNING,
                         ClusterStates.STARTING_BACKUP,
                         ClusterStates.BACKINGUP):
            raise ProtocolError('Can not tweak partition table in %s state'
                                % state)
        app.broadcastPartitionChanges(app.pt.tweak(
            map(app.nm.getByUUID, uuid_list)))
        conn.answer(Errors.Ack(''))
180

181 182 183 184 185 186 187
    def truncate(self, conn, tid):
        app = self.app
        if app.cluster_state != ClusterStates.RUNNING:
            raise ProtocolError('Can not truncate in this state')
        conn.answer(Errors.Ack(''))
        raise StoppedOperation(tid)

188 189 190
    def checkReplicas(self, conn, partition_dict, min_tid, max_tid):
        app = self.app
        pt = app.pt
191
        backingup = bool(app.backup_tid)
192 193 194 195
        if not max_tid:
            max_tid = pt.getCheckTid(partition_dict) if backingup else \
                app.getLastTransaction()
        if min_tid > max_tid:
196 197
            logging.warning("nothing to check: min_tid=%s > max_tid=%s",
                            dump(min_tid), dump(max_tid))
198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228
        else:
            getByUUID = app.nm.getByUUID
            node_set = set()
            for offset, source in partition_dict.iteritems():
                # XXX: For the moment, code checking replicas is unable to fix
                #      corrupted partitions (when a good cell is known)
                #      so only check readable ones.
                #      (see also Checker._nextPartition of storage)
                cell_list = pt.getCellList(offset, True)
                #cell_list = [cell for cell in pt.getCellList(offset)
                #                  if not cell.isOutOfDate()]
                if len(cell_list) + (backingup and not source) <= 1:
                    continue
                for cell in cell_list:
                    node = cell.getNode()
                    if node in node_set:
                        break
                else:
                    node_set.add(node)
                if source:
                    source = '', getByUUID(source).getAddress()
                else:
                    readable = [cell for cell in cell_list if cell.isReadable()]
                    if 1 == len(readable) < len(cell_list):
                        source = '', readable[0].getAddress()
                    elif backingup:
                        source = app.backup_app.name, random.choice(
                            app.backup_app.pt.getCellList(offset, readable=True)
                            ).getAddress()
                    else:
                        source = '', None
229
                node.send(Packets.CheckPartition(
230 231
                    offset, source, min_tid, max_tid))
        conn.answer(Errors.Ack(''))