election.py 9.76 KB
Newer Older
Aurel's avatar
Aurel committed
1 2
#
# Copyright (C) 2006-2009  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18
from neo import logging
19

20
from neo import protocol
21
from neo.protocol import NodeTypes, NodeStates, Packets
22
from neo.master.handlers import MasterHandler
23
from neo.exception import ElectionFailure
24

25
class ElectionHandler(MasterHandler):
26 27
    """This class deals with events for a primary master election."""

28
    def notifyNodeInformation(self, conn, packet, node_list):
29 30
        uuid = conn.getUUID()
        if uuid is None:
31
            raise protocol.ProtocolError('Not identified')
32
        app = self.app
33
        for node_type, addr, uuid, state in node_list:
34
            if node_type != NodeTypes.MASTER:
35 36 37 38 39 40 41 42
                # No interest.
                continue

            # Register new master nodes.
            if app.server == addr:
                # This is self.
                continue
            else:
43
                node = app.nm.getByAddress(addr)
44 45
                # The master must be known
                assert node is not None
46

47
                if uuid is not None:
48 49 50 51 52
                    # If I don't know the UUID yet, believe what the peer
                    # told me at the moment.
                    if node.getUUID() is None:
                        node.setUUID(uuid)

53
                if state in (node.getState(), NodeStates.RUNNING):
54 55 56 57 58 59 60 61 62 63 64
                    # No change. Don't care.
                    continue

                # Something wrong happened possibly. Cut the connection to
                # this node, if any, and notify the information to others.
                # XXX this can be very slow.
                for c in app.em.getConnectionList():
                    if c.getUUID() == uuid:
                        c.close()
                node.setState(state)

65
class ClientElectionHandler(ElectionHandler):
66 67

    def packetReceived(self, conn, packet):
68
        node = self.app.nm.getByAddress(conn.getAddress())
69 70
        if not node.isBroken():
            node.setRunning()
71
        MasterHandler.packetReceived(self, conn, packet)
72

73 74 75 76 77
    def connectionStarted(self, conn):
        app = self.app
        addr = conn.getAddress()
        app.unconnected_master_node_set.remove(addr)
        app.negotiating_master_node_set.add(addr)
78
        MasterHandler.connectionStarted(self, conn)
79 80

    def connectionCompleted(self, conn):
81
        conn.ask(Packets.AskPrimary())
82
        MasterHandler.connectionCompleted(self, conn)
83

84 85
    def connectionClosed(self, conn):
        self.connectionFailed(conn)
86
        MasterHandler.connectionClosed(self, conn)
87 88 89

    def timeoutExpired(self, conn):
        self.connectionFailed(conn)
90
        MasterHandler.timeoutExpired(self, conn)
91

92 93 94 95
    def connectionFailed(self, conn):
        app = self.app
        addr = conn.getAddress()
        app.negotiating_master_node_set.discard(addr)
96
        node = app.nm.getByAddress(addr)
97
        if node.isRunning():
98
            app.unconnected_master_node_set.add(addr)
99 100
            node.setTemporarilyDown()
        elif node.isTemporarilyDown():
101
            app.unconnected_master_node_set.add(addr)
102
        MasterHandler.connectionFailed(self, conn)
103 104 105 106

    def peerBroken(self, conn):
        app = self.app
        addr = conn.getAddress()
107
        node = app.nm.getByAddress(addr)
108
        if node is not None:
109
            node.setDown()
110
        app.negotiating_master_node_set.discard(addr)
111
        MasterHandler.peerBroken(self, conn)
112

113
    def acceptIdentification(self, conn, packet, node_type,
114
            uuid, num_partitions, num_replicas, your_uuid):
115
        app = self.app
116
        node = app.nm.getByAddress(conn.getAddress())
117
        if node_type != NodeTypes.MASTER:
118
            # The peer is not a master node!
119
            logging.error('%s:%d is not a master node', conn.getAddress())
120
            app.nm.remove(node)
121
            app.negotiating_master_node_set.discard(node.getAddress())
122 123
            conn.close()
            return
124

125 126 127 128
        if your_uuid != app.uuid:
            # uuid conflict happened, accept the new one and restart election
            app.uuid = your_uuid
            raise ElectionFailure, 'new uuid supplied'
129

130 131
        conn.setUUID(uuid)
        node.setUUID(uuid)
132

133 134 135 136 137
        if app.uuid < uuid:
            # I lost.
            app.primary = False

        app.negotiating_master_node_set.discard(conn.getAddress())
138

139
    def answerPrimary(self, conn, packet, primary_uuid, known_master_list):
140 141
        if conn.getConnector() is None:
            # Connection can be closed by peer after he sent
142
            # AnswerPrimary if he finds the primary master before we
143 144 145 146 147
            # give him our UUID.
            # The connection gets closed before this message gets processed
            # because this message might have been queued, but connection
            # interruption takes effect as soon as received.
            return
148 149
        app = self.app
        # Register new master nodes.
150 151
        for address, uuid in known_master_list:
            if app.server == address:
152 153
                # This is self.
                continue
154
            else:
155
                n = app.nm.getByAddress(address)
156 157
                # master node must be known
                assert n is not None
158

159
                if uuid is not None:
160 161 162 163
                    # If I don't know the UUID yet, believe what the peer
                    # told me at the moment.
                    if n.getUUID() is None or n.getUUID() != uuid:
                        n.setUUID(uuid)
164

165
        if primary_uuid is not None:
166 167 168 169 170 171
            # The primary master is defined.
            if app.primary_master_node is not None \
                    and app.primary_master_node.getUUID() != primary_uuid:
                # There are multiple primary master nodes. This is
                # dangerous.
                raise ElectionFailure, 'multiple primary master nodes'
172
            primary_node = app.nm.getByUUID(primary_uuid)
173 174 175
            if primary_node is None:
                # I don't know such a node. Probably this information
                # is old. So ignore it.
176
                logging.warning('received an unknown primary node UUID')
Grégory Wisniewski's avatar
Grégory Wisniewski committed
177 178 179 180 181 182 183
            elif primary_node.getUUID() == primary_uuid:
                # Whatever the situation is, I trust this master.
                app.primary = False
                app.primary_master_node = primary_node
                # Stop waiting for connections than primary master's to
                # complete to exit election phase ASAP.
                primary_server = primary_node.getAddress()
184 185
                app.unconnected_master_node_set = set([primary_server])
                app.negotiating_master_node_set = set([primary_server])
186

Grégory Wisniewski's avatar
Grégory Wisniewski committed
187
        # Request a node identification.
188
        conn.ask(Packets.RequestIdentification(
189
            NodeTypes.MASTER,
190 191
            app.uuid,
            app.server,
192 193
            app.name
        ))
194

195

196
class ServerElectionHandler(ElectionHandler):
197

198
    def reelectPrimary(self, conn, packet):
199 200 201 202 203
        raise ElectionFailure, 'reelection requested'

    def peerBroken(self, conn):
        app = self.app
        addr = conn.getAddress()
204
        node = app.nm.getByAddress(addr)
205
        if node is not None and node.getUUID() is not None:
206
            node.setBroken()
207
        MasterHandler.peerBroken(self, conn)
208

209
    def requestIdentification(self, conn, packet, node_type,
210
                                        uuid, address, name):
211 212
        if conn.getConnector() is None:
            # Connection can be closed by peer after he sent
213
            # RequestIdentification if he finds the primary master before
214 215 216 217 218
            # we answer him.
            # The connection gets closed before this message gets processed
            # because this message might have been queued, but connection
            # interruption takes effect as soon as received.
            return
219
        self.checkClusterName(name)
220
        app = self.app
221
        if node_type != NodeTypes.MASTER:
222
            logging.info('reject a connection from a non-master')
223
            raise protocol.NotReadyError
224
        node = app.nm.getByAddress(address)
225
        if node is None:
226 227 228 229
            logging.error('unknown master node: %s' % (address, ))
            raise protocol.ProtocolError('unknown master node')
        # If this node is broken, reject it.
        if node.getUUID() == uuid:
230
            if node.isBroken():
231
                raise protocol.BrokenNodeDisallowedError
232 233

        # supplied another uuid in case of conflict
234
        while not app.isValidUUID(uuid, address):
235 236 237 238 239
            uuid = app.getNewUUID(node_type)

        node.setUUID(uuid)
        conn.setUUID(uuid)

240
        p = Packets.AcceptIdentification(
241 242 243 244
            NodeTypes.MASTER,
            app.uuid,
            app.pt.getPartitions(),
            app.pt.getReplicas(),
245 246
            uuid
        )
247
        conn.answer(p, packet.getId())
248

249
    def announcePrimary(self, conn, packet):
250
        uuid = conn.getUUID()
251
        if uuid is None:
252
            raise protocol.ProtocolError('Not identified')
253 254 255 256
        app = self.app
        if app.primary:
            # I am also the primary... So restart the election.
            raise ElectionFailure, 'another primary arises'
257
        node = app.nm.getByUUID(uuid)
258 259 260
        app.primary = False
        app.primary_master_node = node
        logging.info('%s is the primary', node)
261