app.py 24.8 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2006-2017  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
Aurel's avatar
Aurel committed
16

17
import sys, weakref
18
from collections import defaultdict
Grégory Wisniewski's avatar
Grégory Wisniewski committed
19
from time import time
Yoshinori Okuji's avatar
Yoshinori Okuji committed
20

21
from neo.lib import logging
22
from neo.lib.app import BaseApplication
23
from neo.lib.debug import register as registerLiveDebugger
24
from neo.lib.protocol import uuid_str, UUID_NAMESPACES, ZERO_TID
25
from neo.lib.protocol import ClusterStates, NodeStates, NodeTypes, Packets
26
from neo.lib.handler import EventHandler
27
from neo.lib.connection import ListeningConnection, ClientConnection
28
from neo.lib.exception import ElectionFailure, PrimaryFailure, StoppedOperation
29 30 31

class StateChangedException(Exception): pass

32 33 34 35 36 37 38 39 40 41
_previous_time = 0
def monotonic_time():
    global _previous_time
    now = time()
    if _previous_time < now:
        _previous_time = now
    else:
        _previous_time = now = _previous_time + 1e-3
    return now

42
from .backup_app import BackupApplication
43
from .handlers import election, identification, secondary
44
from .handlers import administration, client, storage
45 46 47 48
from .pt import PartitionTable
from .recovery import RecoveryManager
from .transactions import TransactionManager
from .verification import VerificationManager
Yoshinori Okuji's avatar
Yoshinori Okuji committed
49

50

51
class Application(BaseApplication):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52
    """The master node application."""
53
    packing = None
54
    storage_readiness = 0
Julien Muchembled's avatar
Julien Muchembled committed
55
    # Latest completely committed TID
56
    last_transaction = ZERO_TID
57 58
    backup_tid = None
    backup_app = None
59
    truncate_tid = None
60 61 62
    uuid = property(
        lambda self: self._node.getUUID(),
        lambda self, uuid: self._node.setUUID(uuid))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
63

64
    def __init__(self, config):
Julien Muchembled's avatar
Julien Muchembled committed
65 66
        super(Application, self).__init__(
            config.getSSL(), config.getDynamicMasterList())
67
        self.tm = TransactionManager(self.onTransactionCommitted)
68

69 70
        self.name = config.getCluster()
        self.server = config.getBind()
71
        self.autostart = config.getAutostart()
72

73
        self.storage_ready_dict = {}
74
        self.storage_starting_set = set()
75
        for master_address in config.getMasters():
Olivier Cros's avatar
Olivier Cros committed
76
            self.nm.createMaster(address=master_address)
77 78
        self._node = self.nm.createMaster(address=self.server,
                                          uuid=config.getUUID())
79

80
        logging.debug('IP address is %s, port is %d', *self.server)
81

82
        # Partition table
83
        replicas, partitions = config.getReplicas(), config.getPartitions()
84 85 86 87 88
        if replicas < 0:
            raise RuntimeError, 'replicas must be a positive integer'
        if partitions <= 0:
            raise RuntimeError, 'partitions must be more than zero'
        self.pt = PartitionTable(partitions, replicas)
89 90 91 92
        logging.info('Configuration:')
        logging.info('Partitions: %d', partitions)
        logging.info('Replicas  : %d', replicas)
        logging.info('Name      : %s', self.name)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
93

94
        self.listening_conn = None
95 96
        self.primary = None
        self.primary_master_node = None
97
        self.cluster_state = None
98

99 100 101
        # election related data
        self.unconnected_master_node_set = set()
        self.negotiating_master_node_set = set()
102
        self.master_address_dict = weakref.WeakKeyDictionary()
103

104
        self._current_manager = None
Yoshinori Okuji's avatar
Yoshinori Okuji committed
105

106 107 108 109 110 111 112
        # backup
        upstream_cluster = config.getUpstreamCluster()
        if upstream_cluster:
            if upstream_cluster == self.name:
                raise ValueError("upstream cluster name must be"
                                 " different from cluster name")
            self.backup_app = BackupApplication(self, upstream_cluster,
113
                                                config.getUpstreamMasters())
114

115 116 117 118
        self.administration_handler = administration.AdministrationHandler(
            self)
        self.secondary_master_handler = secondary.SecondaryMasterHandler(self)
        self.client_service_handler = client.ClientServiceHandler(self)
119
        self.client_ro_service_handler = client.ClientReadOnlyServiceHandler(self)
120 121
        self.storage_service_handler = storage.StorageServiceHandler(self)

122
        registerLiveDebugger(on_log=self.log)
123

124 125
    def close(self):
        self.listening_conn = None
126 127
        if self.backup_app is not None:
            self.backup_app.close()
128
        super(Application, self).close()
129

130 131
    def log(self):
        self.em.log()
132 133
        if self.backup_app is not None:
            self.backup_app.log()
134 135 136 137 138
        self.nm.log()
        self.tm.log()
        if self.pt is not None:
            self.pt.log()

Yoshinori Okuji's avatar
Yoshinori Okuji committed
139
    def run(self):
140 141
        try:
            self._run()
142
        except Exception:
143
            logging.exception('Pre-mortem data:')
144
            self.log()
145
            logging.flush()
146 147 148
            raise

    def _run(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
149 150
        """Make sure that the status is sane and start a loop."""
        # Make a listening port.
151
        self.listening_conn = ListeningConnection(self, None, self.server)
152

153
        # Start a normal operation.
154
        while self.cluster_state != ClusterStates.STOPPING:
155
            # (Re)elect a new primary master.
Julien Muchembled's avatar
Julien Muchembled committed
156
            self.primary = not self.nm.getMasterList()
157
            if not self.primary:
158
                self.electPrimary()
159 160
            try:
                if self.primary:
161
                    self.playPrimaryRole()
162 163
                else:
                    self.playSecondaryRole()
164
                raise RuntimeError, 'should not reach here'
165
            except (ElectionFailure, PrimaryFailure):
166
                # Forget all connections.
167 168
                for conn in self.em.getClientList():
                    conn.close()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
169

170

171
    def electPrimary(self):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
172
        """Elect a primary master node.
Aurel's avatar
Aurel committed
173

Yoshinori Okuji's avatar
Yoshinori Okuji committed
174 175 176 177
        The difficulty is that a master node must accept connections from
        others while attempting to connect to other master nodes at the
        same time. Note that storage nodes and client nodes may connect
        to self as well as master nodes."""
178
        logging.info('begin the election of a primary master')
Yoshinori Okuji's avatar
Yoshinori Okuji committed
179

180
        client_handler = election.ClientElectionHandler(self)
181 182
        self.unconnected_master_node_set.clear()
        self.negotiating_master_node_set.clear()
183
        self.master_address_dict.clear()
184
        self.listening_conn.setHandler(election.ServerElectionHandler(self))
185
        getByAddress = self.nm.getByAddress
186

187
        while True:
188

189 190
            # handle new connected masters
            for node in self.nm.getMasterList():
191 192
                if node is self._node:
                    continue
193 194
                node.setUnknown()
                self.unconnected_master_node_set.add(node.getAddress())
195

196 197 198 199
            # start the election process
            self.primary = None
            self.primary_master_node = None
            try:
200 201
                while (self.unconnected_master_node_set or
                        self.negotiating_master_node_set):
202
                    for addr in self.unconnected_master_node_set:
203
                        self.negotiating_master_node_set.add(addr)
204
                        ClientConnection(self, client_handler,
205 206
                            # XXX: Ugly, but the whole election code will be
                            # replaced soon
207
                            getByAddress(addr))
208
                    self.unconnected_master_node_set.clear()
209
                    self.em.poll(1)
210 211
            except ElectionFailure, m:
                # something goes wrong, clean then restart
212
                logging.error('election failed: %s', m)
213 214 215

                # Ask all connected nodes to reelect a single primary master.
                for conn in self.em.getClientList():
216
                    conn.send(Packets.ReelectPrimary())
217 218 219 220 221
                    conn.abort()

                # Wait until the connections are closed.
                self.primary = None
                self.primary_master_node = None
222 223 224
                # XXX: Since poll does not wake up anymore every second,
                #      the following time condition should be reviewed.
                #      See also playSecondaryRole.
225 226 227 228 229 230 231 232 233 234
                t = time() + 10
                while self.em.getClientList() and time() < t:
                    try:
                        self.em.poll(1)
                    except ElectionFailure:
                        pass

                # Close all connections.
                for conn in self.em.getClientList() + self.em.getServerList():
                    conn.close()
235 236
            else:
                # election succeed, stop the process
237 238 239
                self.primary = self.primary is None
                break

240
    def getNodeInformationDict(self, node_list):
241
        node_dict = defaultdict(list)
242 243
        # group modified nodes by destination node type
        for node in node_list:
244
            node_info = node.asTuple()
245 246 247 248 249 250 251 252 253 254
            if node.isAdmin():
                continue
            node_dict[NodeTypes.ADMIN].append(node_info)
            node_dict[NodeTypes.STORAGE].append(node_info)
            if node.isClient():
                continue
            node_dict[NodeTypes.CLIENT].append(node_info)
            if node.isStorage():
                continue
            node_dict[NodeTypes.MASTER].append(node_info)
255
        return node_dict
256

257 258 259 260 261 262
    def broadcastNodesInformation(self, node_list, exclude=None):
        """
          Broadcast changes for a set a nodes
          Send only one packet per connection to reduce bandwidth
        """
        node_dict = self.getNodeInformationDict(node_list)
263
        now = monotonic_time()
264
        # send at most one non-empty notification packet per node
265
        for node in self.nm.getIdentifiedList():
266
            node_list = node_dict.get(node.getType())
267 268 269 270
            # We don't skip pending storage nodes because we don't send them
            # the full list of nodes when they're added, and it's also quite
            # useful to notify them about new masters.
            if node_list and node is not exclude:
271
                node.send(Packets.NotifyNodeInformation(now, node_list))
272

273
    def broadcastPartitionChanges(self, cell_list):
274
        """Broadcast a Notify Partition Changes packet."""
275 276
        if cell_list:
            ptid = self.pt.setNextID()
277
            self.pt.logUpdated()
278 279
            packet = Packets.NotifyPartitionChanges(ptid, cell_list)
            for node in self.nm.getIdentifiedList():
280 281 282
                # As for broadcastNodesInformation, we don't send the full PT
                # when pending storage nodes are added, so keep them notified.
                if not node.isMaster():
283
                    node.send(packet)
284

Yoshinori Okuji's avatar
Yoshinori Okuji committed
285
    def provideService(self):
286 287
        """
        This is the normal mode for a primary master node. Handle transactions
Julien Muchembled's avatar
Julien Muchembled committed
288
        and stop the service only if a catastrophe happens or the user commits
289 290
        a shutdown.
        """
291
        logging.info('provide service')
292
        poll = self.em.poll
293
        self.changeClusterState(ClusterStates.RUNNING)
294

Yoshinori Okuji's avatar
Yoshinori Okuji committed
295
        # Now everything is passive.
296 297 298 299
        try:
            while True:
                poll(1)
        except StateChangedException, e:
300 301
            if e.args[0] != ClusterStates.STARTING_BACKUP:
                raise
302
            self.backup_tid = tid = self.getLastTransaction()
303 304 305 306 307
            packet = Packets.StartOperation(True)
            tid_dict = {}
            for node in self.nm.getStorageList(only_identified=True):
                tid_dict[node.getUUID()] = tid
                if node.isRunning():
308
                    node.send(packet)
309
            self.pt.setBackupTidDict(tid_dict)
310

311
    def playPrimaryRole(self):
312
        logging.info('play the primary role with %r', self.listening_conn)
313
        self.master_address_dict.clear()
314
        em = self.em
315
        packet = Packets.AnnouncePrimary()
316
        for conn in em.getConnectionList():
317 318 319
            if conn.isListening():
                conn.setHandler(identification.IdentificationHandler(self))
            else:
320
                conn.send(packet)
321 322 323 324
                # Primary master should rather establish connections to all
                # secondaries, rather than the other way around. This requires
                # a bit more work when a new master joins a cluster but makes
                # it easier to resolve UUID conflicts with minimal cluster
Julien Muchembled's avatar
Julien Muchembled committed
325
                # impact, and ensure primary master uniqueness (primary masters
326 327 328 329 330 331 332 333 334
                # become noisy, in that they actively try to maintain
                # connections to all other master nodes, so duplicate
                # primaries will eventually get in touch with each other and
                # resolve the situation with a duel).
                # TODO: only abort client connections, don't close server
                # connections as we want to have them in the end. Secondary
                # masters will reconnect nevertheless, but it's dirty.
                # Currently, it's not trivial to preserve connected nodes,
                # because of poor node status tracking during election.
335 336 337 338 339
                # XXX: The above comment is partially wrong in that the primary
                # master is now responsible of allocating node ids, and all
                # other nodes must only create/update/remove nodes when
                # processing node notification. We probably want to keep the
                # current behaviour: having only server connections.
340
                conn.abort()
341

342
        # If I know any storage node, make sure that they are not in the
343
        # running state, because they are not connected at this stage.
344
        for node in self.nm.getStorageList():
345 346
            if node.isRunning():
                node.setTemporarilyDown()
347

348 349
        if self.uuid is None:
            self.uuid = self.getNewUUID(None, self.server, NodeTypes.MASTER)
350
            logging.info('My UUID: ' + uuid_str(self.uuid))
351
        self._node.setRunning()
352

353 354 355 356 357 358 359
        # Do not restart automatically if ElectionFailure is raised, in order
        # to avoid a split of the database. For example, with 2 machines with
        # a master and a storage on each one and replicas=1, the secondary
        # master becomes primary in case of network failure between the 2
        # machines but must not start automatically: otherwise, each storage
        # node would diverge.
        self._startup_allowed = False
360 361
        try:
            while True:
362
                self.runManager(RecoveryManager)
363
                try:
364 365 366 367 368 369 370 371
                    self.runManager(VerificationManager)
                    if not self.backup_tid:
                        self.provideService()
                        # self.provideService only returns without raising
                        # when switching to backup mode.
                    if self.backup_app is None:
                        raise RuntimeError("No upstream cluster to backup"
                                           " defined in configuration")
372 373 374
                    truncate = Packets.Truncate(
                        self.backup_app.provideService())
                except StoppedOperation, e:
375
                    logging.critical('No longer operational')
376 377 378
                    truncate = Packets.Truncate(*e.args) if e.args else None
                    # Automatic restart except if we truncate or retry to.
                    self._startup_allowed = not (self.truncate_tid or truncate)
379 380 381
                self.storage_readiness = 0
                self.storage_ready_dict.clear()
                self.storage_starting_set.clear()
382
                node_list = []
383 384
                for node in self.nm.getIdentifiedList():
                    if node.isStorage() or node.isClient():
385
                        conn = node.getConnection()
386
                        conn.send(Packets.StopOperation())
387
                        if node.isClient():
388
                            conn.abort()
389 390
                            continue
                        if truncate:
391
                            conn.send(truncate)
392
                        if node.isRunning():
393 394 395
                            node.setPending()
                            node_list.append(node)
                self.broadcastNodesInformation(node_list)
396 397 398
        except StateChangedException, e:
            assert e.args[0] == ClusterStates.STOPPING
            self.shutdown()
399 400

    def playSecondaryRole(self):
401 402 403
        """
        I play a secondary role, thus only wait for a primary master to fail.
        """
404
        logging.info('play the secondary role with %r', self.listening_conn)
405

406 407
        # Wait for an announcement. If this is too long, probably
        # the primary master is down.
408
        # XXX: Same remark as in electPrimary.
409
        t = time() + 10
410 411
        while self.primary_master_node is None:
            self.em.poll(1)
412
            if t < time():
413 414
                # election timeout
                raise ElectionFailure("Election timeout")
415
        self.master_address_dict.clear()
416

417 418 419
        # Restart completely. Non-optimized
        # but lower level code needs to be stabilized first.
        for conn in self.em.getConnectionList():
420 421
            if not conn.isListening():
                conn.close()
422

423
        # Reconnect to primary master node.
424
        self.nm.reset()
425
        primary_handler = secondary.PrimaryHandler(self)
426
        ClientConnection(self, primary_handler, self.primary_master_node)
427

428
        # and another for the future incoming connections
429
        self.listening_conn.setHandler(
430
            identification.SecondaryIdentificationHandler(self))
431

432
        while True:
433
            self.em.poll(1)
434

435 436
    def runManager(self, manager_klass):
        self._current_manager = manager_klass(self)
437 438 439 440
        try:
            self._current_manager.run()
        finally:
            self._current_manager = None
441

442
    def changeClusterState(self, state):
443 444
        """
        Change the cluster state and apply right handler on each connections
445
        """
446 447 448 449
        if self.cluster_state == state:
            return

        # select the storage handler
450 451
        if state in (ClusterStates.RUNNING, ClusterStates.STARTING_BACKUP,
                     ClusterStates.BACKINGUP, ClusterStates.STOPPING_BACKUP):
452
            storage_handler = self.storage_service_handler
453 454
        elif self._current_manager is not None:
            storage_handler = self._current_manager.getHandler()
455 456
        elif state == ClusterStates.STOPPING:
            storage_handler = None
457
        else:
458
            raise RuntimeError('Unexpected cluster state')
459 460

        # change handlers
461
        notification_packet = Packets.NotifyClusterInformation(state)
462 463
        for node in self.nm.getIdentifiedList():
            conn = node.getConnection()
464
            conn.send(notification_packet)
465
            if node.isClient():
466 467 468 469 470
                if state == ClusterStates.RUNNING:
                    handler = self.client_service_handler
                elif state == ClusterStates.BACKINGUP:
                    handler = self.client_ro_service_handler
                else:
471 472
                    if state != ClusterStates.STOPPING:
                        conn.abort()
473
                    continue
474
            elif node.isStorage() and storage_handler:
475
                handler = storage_handler
476 477
            else:
                continue # keep handler
478 479
            if type(handler) is not type(conn.getLastHandler()):
                conn.setHandler(handler)
480
                handler.connectionCompleted(conn, new=False)
481 482
        self.cluster_state = state

483
    def getNewUUID(self, uuid, address, node_type):
484
        getByUUID = self.nm.getByUUID
485
        if None != uuid != self.uuid:
486
            node = getByUUID(uuid)
487 488
            if node is None or node.getAddress() == address:
                return uuid
489 490 491
        hob = UUID_NAMESPACES[node_type]
        for uuid in xrange((hob << 24) + 1, hob + 0x10 << 24):
            if uuid != self.uuid and getByUUID(uuid) is None:
492
                return uuid
493
        raise RuntimeError
494

495 496
    def getClusterState(self):
        return self.cluster_state
497

498 499
    def shutdown(self):
        """Close all connections and exit"""
500
        self.changeClusterState(ClusterStates.STOPPING)
501 502 503 504 505 506 507 508 509 510 511 512
        # Marking a fictional storage node as starting operation blocks any
        # request to start a new transaction. Do this way has 2 advantages:
        # - It's simpler than changing the handler of all clients,
        #   which is anyway not supported by EventQueue.
        # - Returning an error code would cause activity on client side for
        #   nothing.
        # What's important is to not abort during the second phase of commits
        # and for this, clients must even be able to reconnect, in case of
        # failure during tpc_finish.
        # We're rarely involved in vote, so we have to trust clients that they
        # abort any transaction that is still in the first phase.
        self.storage_starting_set.add(None)
513 514 515 516
        try:
            # wait for all transaction to be finished
            while self.tm.hasPending():
                self.em.poll(1)
517
        except StoppedOperation:
518
            logging.critical('No longer operational')
519

520
        logging.info("asking remaining nodes to shutdown")
521
        self.listening_conn.close()
522 523 524 525 526
        handler = EventHandler(self)
        for node in self.nm.getConnectedList():
            conn = node.getConnection()
            if node.isStorage():
                conn.setHandler(handler)
527
                conn.send(Packets.NotifyNodeInformation(monotonic_time(), ((
528 529
                    node.getType(), node.getAddress(), node.getUUID(),
                    NodeStates.TEMPORARILY_DOWN, None),)))
530 531 532 533 534
                conn.abort()
            elif conn.pending():
                conn.abort()
            else:
                conn.close()
535

536 537
        while self.em.connection_dict:
            self.em.poll(1)
538 539 540

        # then shutdown
        sys.exit()
541

542 543 544 545
    def identifyStorageNode(self, known):
        if known:
            state = NodeStates.RUNNING
        else:
546 547
            # same as for verification
            state = NodeStates.PENDING
548
        return state, self.storage_service_handler
549

550
    def onTransactionCommitted(self, txn):
551 552 553 554
        # I have received all the lock answers now:
        # - send a Notify Transaction Finished to the initiated client node
        # - Invalidate Objects to the other client nodes
        ttid = txn.getTTID()
555
        tid = txn.getTID()
556 557 558 559
        transaction_node = txn.getNode()
        invalidate_objects = Packets.InvalidateObjects(tid, txn.getOIDList())
        for client_node in self.nm.getClientList(only_identified=True):
            if client_node is transaction_node:
560 561
                client_node.send(Packets.AnswerTransactionFinished(ttid, tid),
                                 msg_id=txn.getMessageId())
562
            else:
563
                client_node.send(invalidate_objects)
564 565 566 567 568

        # Unlock Information to relevant storage nodes.
        notify_unlock = Packets.NotifyUnlockInformation(ttid)
        getByUUID = self.nm.getByUUID
        for storage_uuid in txn.getUUIDList():
569
            getByUUID(storage_uuid).send(notify_unlock)
570

571 572
        # Notify storage that have replications blocked by this transaction,
        # and clients that try to recover from a failure during tpc_finish.
573
        notify_finished = Packets.NotifyTransactionFinished(ttid, tid)
574 575 576 577 578 579
        for uuid in txn.getNotificationUUIDList():
            node = getByUUID(uuid)
            if node.isClient():
                # There should be only 1 client interested.
                node.answer(Packets.AnswerFinalTID(tid))
            else:
580
                node.send(notify_finished)
581

Julien Muchembled's avatar
Julien Muchembled committed
582
        assert self.last_transaction < tid, (self.last_transaction, tid)
583 584
        self.setLastTransaction(tid)

585 586 587 588 589 590
    def getLastTransaction(self):
        return self.last_transaction

    def setLastTransaction(self, tid):
        self.last_transaction = tid

591
    def setStorageNotReady(self, uuid):
592
        self.storage_starting_set.discard(uuid)
593
        self.storage_ready_dict.pop(uuid, None)
594
        self.tm.executeQueuedEvents()
595

596
    def startStorage(self, node):
597
        node.send(Packets.StartOperation(self.backup_tid))
598 599
        uuid = node.getUUID()
        assert uuid not in self.storage_starting_set
600
        if uuid not in self.storage_ready_dict:
601 602 603 604 605 606 607 608
            self.storage_starting_set.add(uuid)

    def setStorageReady(self, uuid):
        self.storage_starting_set.remove(uuid)
        assert uuid not in self.storage_ready_dict, self.storage_ready_dict
        self.storage_readiness = self.storage_ready_dict[uuid] = \
            self.storage_readiness + 1
        self.tm.executeQueuedEvents()
609 610

    def isStorageReady(self, uuid):
611
        return uuid in self.storage_ready_dict
612

613
    def getStorageReadySet(self, readiness=float('inf')):
614 615
        return {k for k, v in self.storage_ready_dict.iteritems()
                  if v <= readiness}