handler.py 9.66 KB
Newer Older
1 2
#
# Copyright (C) 2009  Nexedi SA
Aurel's avatar
Aurel committed
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
9 10 11 12 13 14 15 16 17
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

18
from neo import logging
19 20

from neo.handler import EventHandler
21
from neo.node import StorageNode
22
from neo import protocol
Aurel's avatar
Aurel committed
23
from neo.exception import PrimaryFailure
24 25
from neo.util import dump

26
class AdminEventHandler(EventHandler):
Aurel's avatar
Aurel committed
27 28
    """This class deals with events for administrating cluster."""

29 30
    def __notConnected(self, conn, packet):
        conn.answer(protocol.notReady('Not connected to a primary master.'),
31
                    packet.getId())
32

Aurel's avatar
Aurel committed
33 34 35
    def handleAskPartitionList(self, conn, packet, min_offset, max_offset, uuid):
        logging.info("ask partition list from %s to %s for %s" %(min_offset, max_offset, dump(uuid)))
        app = self.app
36
        # check we have one pt otherwise ask it to PMN
37
        if app.pt is None:
38
            master_conn = self.app.master_conn
39 40 41 42 43 44 45 46 47 48
            if master_conn is None:
                self.__notConnected(conn, packet)
            else:
                p = protocol.askPartitionTable([])
                msg_id = master_conn.ask(p)
                app.dispatcher.register(msg_id, conn,
                                        {'min_offset' : min_offset,
                                         'max_offset' : max_offset,
                                         'uuid' : uuid,
                                         'msg_id' : packet.getId()})
49
        else:
Aurel's avatar
Aurel committed
50
            app.sendPartitionTable(conn, min_offset, max_offset, uuid, packet.getId())
Aurel's avatar
Aurel committed
51

Aurel's avatar
Aurel committed
52 53 54 55 56 57 58 59

    def handleAskNodeList(self, conn, packet, node_type):
        logging.info("ask node list for %s" %(node_type))
        def node_filter(n):
            return n.getNodeType() is node_type
        node_list = self.app.nm.getNodeList(node_filter)
        node_information_list = []
        for node in node_list:
60 61 62 63
            try:
                ip, port = node.getServer()
            except TypeError:
                ip = "0.0.0.0"
64
                port = 0
65
            node_information_list.append((node.getNodeType(), (ip, port), node.getUUID(), node.getState()))
Aurel's avatar
Aurel committed
66
        p = protocol.answerNodeList(node_information_list)
67
        conn.answer(p, packet.getId())
Aurel's avatar
Aurel committed
68

69
    def handleSetNodeState(self, conn, packet, uuid, state, modify_partition_table):
Aurel's avatar
Aurel committed
70 71 72 73 74
        logging.info("set node state for %s-%s" %(dump(uuid), state))
        node = self.app.nm.getNodeByUUID(uuid)
        if node is None:
            p = protocol.protocolError('invalid uuid')
            conn.notify(p)
75
            return
76
        if node.getState() == state and modify_partition_table is False:
77 78
            # no change
            p = protocol.answerNodeState(node.getUUID(), node.getState())
79
            conn.answer(p, packet.getId())
80
            return
81
        # forward to primary master node
Aurel's avatar
Aurel committed
82
        master_conn = self.app.master_conn
83 84 85 86 87 88
        if master_conn is None:
            self.__notConnected(conn, packet)
        else:
            p = protocol.setNodeState(uuid, state, modify_partition_table)
            msg_id = master_conn.ask(p)
            self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
Aurel's avatar
Aurel committed
89

90
    def handleSetClusterState(self, conn, packet, state):
91 92
        # forward to primary
        master_conn = self.app.master_conn
93 94 95 96 97 98
        if master_conn is None:
            self.__notConnected(conn, packet)
        else:
            p = protocol.setClusterState(state)
            msg_id = master_conn.ask(p)
            self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
Aurel's avatar
Aurel committed
99

100
    def handleAddPendingNodes(self, conn, packet, uuid_list):
101
        logging.info('Add nodes %s' % [dump(uuid) for uuid in uuid_list])
102 103
        # forward the request to primary
        master_conn = self.app.master_conn
104 105 106 107 108
        if master_conn is None:
            self.__notConnected(conn, packet)
        else:
            msg_id = master_conn.ask(protocol.addPendingNodes(uuid_list))
            self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
109

110 111 112
    def handleAskClusterState(self, conn, packet):
        if self.app.cluster_state is None:
            # required it from PMN first
113 114 115 116 117 118
            master_conn = self.app.master_conn
            if master_conn is None:
                self.__notConnected(conn, packet)
            else:
                msg_id = master_conn.ask(protocol.askClusterState())
                self.app.dispatcher.register(msg_id, conn, {'msg_id' : packet.getId()})
119
            return
120
        conn.answer(protocol.answerClusterState(self.app.cluster_state), packet.getId())
121

122 123

class MasterEventHandler(EventHandler):
124
    """ This class is just used to dispacth message to right handler"""
Aurel's avatar
Aurel committed
125

126
    def _connectionLost(self, conn):
127 128 129 130 131
        app = self.app
        assert app.master_conn is conn
        app.master_conn = None
        app.master_node = None
        app.uuid = None
132 133 134
        node = app.nm.getNodeByUUID(conn.getUUID())
        assert node is not None
        node.setState(protocol.TEMPORARILY_DOWN_STATE)
135
        raise PrimaryFailure
136 137 138

    def connectionFailed(self, conn):
        self._connectionLost(conn)
Aurel's avatar
Aurel committed
139

140
    def timeoutExpired(self, conn):
141
        self._connectionLost(conn)
Aurel's avatar
Aurel committed
142

143
    def connectionClosed(self, conn):
144
        self._connectionLost(conn)
145 146

    def peerBroken(self, conn):
147
        self._connectionLost(conn)
148

149
    def dispatch(self, conn, packet):
150 151 152 153 154
        if not packet.isResponse():
            # not an answer
            self.app.monitoring_handler.dispatch(conn, packet)
        elif self.app.dispatcher.registered(packet.getId()):
            # expected answer
155 156
            self.app.request_handler.dispatch(conn, packet)
        else:
157 158 159 160 161 162 163
            # unexpectexd answer, this should be answerNodeInformation or
            # answerPartitionTable from the master node during initialization.
            # This will no more exists when the initialization module will be
            # implemented for factorize code (as done for bootstrap)
            EventHandler.dispatch(self, conn, packet)

    def handleAnswerNodeInformation(self, conn, packet, node_list):
164
        logging.debug("handleAnswerNodeInformation")
165 166

    def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
167
        logging.debug("handleAnswerPartitionTable")
168

169
    def handleNotifyClusterInformation(self, conn, packet, cluster_state):
170
        logging.debug("handleNotifyClusterInformation")
171 172 173 174 175


class MasterBaseEventHandler(EventHandler):
    """ This is the base class for connection to primary master node"""

176
    def handleNotifyClusterInformation(self, conn, packet, cluster_state):
177 178
        self.app.cluster_state = cluster_state

179
    def handleNotifyNodeInformation(self, conn, packet, node_list):
180
        self.app.nm.update(node_list)
181 182 183 184 185 186
        self.app.notified = True


class MasterRequestEventHandler(MasterBaseEventHandler):
    """ This class handle all answer from primary master node"""

187 188
    def __answerNeoCTL(self, msg_id, packet):
        client_conn, kw = self.app.dispatcher.pop(msg_id)
189
        client_conn.answer(packet, kw['msg_id'])
190

191 192
    def handleAnswerClusterState(self, conn, packet, state):
        logging.info("handleAnswerClusterState for a conn")
193
        self.app.cluster_state = state
194 195
        self.__answerNeoCTL(packet.getId(),
                            protocol.answerClusterState(state))
196 197 198

    def handleAnswerNewNodes(self, conn, packet, uuid_list):
        logging.info("handleAnswerNewNodes for a conn")
199 200
        self.__answerNeoCTL(packet.getId(),
                            protocol.answerNewNodes(uuid_list))
Aurel's avatar
Aurel committed
201

202 203
    def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
        logging.info("handleAnswerPartitionTable for a conn")
204
        client_conn, kw = self.app.dispatcher.pop(packet.getId())
205 206
        # sent client the partition table
        self.app.sendPartitionTable(client_conn, **kw)
Aurel's avatar
Aurel committed
207

Aurel's avatar
Aurel committed
208
    def handleAnswerNodeState(self, conn, packet, uuid, state):
209 210
        self.__answerNeoCTL(packet.getId(),
                            protocol.answerNodeState(uuid, state))
Aurel's avatar
Aurel committed
211

212
    def handleNoError(self, conn, packet, msg):
213
        self.__answerNeoCTL(packet.getId(), protocol.noError(msg))
214

215
    def handleProtocolError(self, conn, packet, msg):
216
        self.__answerNeoCTL(packet.getId(), protocol.protocolError(msg))
Aurel's avatar
Aurel committed
217

218

219 220 221
class MasterMonitoringEventHandler(MasterBaseEventHandler):
    """This class deals with events for monitoring cluster."""

Aurel's avatar
Aurel committed
222 223
    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
        app = self.app
224
        if ptid < app.ptid:
Aurel's avatar
Aurel committed
225 226 227
            # Ignore this packet.
            return
        app.ptid = ptid
228
        app.pt.update(ptid, cell_list, app.nm)
Aurel's avatar
Aurel committed
229

230
    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
Aurel's avatar
Aurel committed
231 232 233
        uuid = conn.getUUID()
        app = self.app
        nm = app.nm
234 235 236 237 238 239 240 241 242 243
        pt = app.pt
        node = app.nm.getNodeByUUID(uuid)
        if app.ptid != ptid:
            app.ptid = ptid
            pt.clear()
        for offset, row in row_list:
            for uuid, state in row:
                node = nm.getNodeByUUID(uuid)
                if node is None:
                    node = StorageNode(uuid = uuid)
244
                    node.setState(protocol.TEMPORARILY_DOWN_STATE)
245 246 247
                    nm.add(node)
                pt.setCell(offset, node, state)
        pt.log()
248