app.py 5.63 KB
Newer Older
1
#
Julien Muchembled's avatar
Julien Muchembled committed
2
# Copyright (C) 2006-2019  Nexedi SA
Aurel's avatar
Aurel committed
3
#
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
Aurel's avatar
Aurel committed
8
#
9 10 11 12 13 14
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
15
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16

17
from neo.lib import logging
18
from neo.lib.app import BaseApplication, buildOptionParser
19 20
from neo.lib.connection import ListeningConnection
from neo.lib.exception import PrimaryFailure
21
from .handler import AdminEventHandler, MasterEventHandler, \
22
    MasterRequestEventHandler
23 24
from neo.lib.bootstrap import BootstrapManager
from neo.lib.pt import PartitionTable
25
from neo.lib.protocol import ClusterStates, Errors, NodeTypes, Packets
26
from neo.lib.debug import register as registerLiveDebugger
27

28
@buildOptionParser
29
class Application(BaseApplication):
30 31
    """The storage node application."""

32 33 34 35 36 37 38 39 40 41
    @classmethod
    def _buildOptionParser(cls):
        _ = cls.option_parser
        _.description = "NEO Admin node"
        cls.addCommonServerOptions('admin', '127.0.0.1:9999')

        _ = _.group('admin')
        _.int('u', 'uuid',
            help="specify an UUID to use for this process (testing purpose)")

42
    def __init__(self, config):
Julien Muchembled's avatar
Julien Muchembled committed
43
        super(Application, self).__init__(
44 45
            config.get('ssl'), config.get('dynamic_master_list'))
        for address in config['masters']:
46
            self.nm.createMaster(address=address)
47

48 49
        self.name = config['cluster']
        self.server = config['bind']
Aurel's avatar
Aurel committed
50

51
        logging.debug('IP address is %s, port is %d', *self.server)
52

Aurel's avatar
Aurel committed
53 54 55
        # The partition table is initialized after getting the number of
        # partitions.
        self.pt = None
56
        self.uuid = config.get('uuid')
57
        logging.node(self.name, self.uuid)
58
        self.request_handler = MasterRequestEventHandler(self)
59
        self.master_event_handler = MasterEventHandler(self)
60
        self.cluster_state = None
61 62 63
        self.reset()
        registerLiveDebugger(on_log=self.log)

64 65
    def close(self):
        self.listening_conn = None
66
        super(Application, self).close()
67

68
    def reset(self):
69
        self.bootstrapped = False
70 71
        self.master_conn = None
        self.master_node = None
72 73 74 75 76 77

    def log(self):
        self.em.log()
        self.nm.log()
        if self.pt is not None:
            self.pt.log()
Aurel's avatar
Aurel committed
78

Aurel's avatar
Aurel committed
79
    def run(self):
80 81
        try:
            self._run()
82
        except Exception:
83
            logging.exception('Pre-mortem data:')
84
            self.log()
85
            logging.flush()
86 87 88
            raise

    def _run(self):
Aurel's avatar
Aurel committed
89 90 91 92 93 94
        """Make sure that the status is sane and start a loop."""
        if len(self.name) == 0:
            raise RuntimeError, 'cluster name must be non-empty'

        # Make a listening port.
        handler = AdminEventHandler(self)
95
        self.listening_conn = ListeningConnection(self, handler, self.server)
Aurel's avatar
Aurel committed
96

97
        while self.cluster_state != ClusterStates.STOPPING:
98
            self.connectToPrimary()
Aurel's avatar
Aurel committed
99
            try:
100
                while True:
Aurel's avatar
Aurel committed
101 102
                    self.em.poll(1)
            except PrimaryFailure:
103
                logging.error('primary master is down')
104 105 106
        self.listening_conn.close()
        while not self.em.isIdle():
            self.em.poll(1)
Aurel's avatar
Aurel committed
107

108
    def connectToPrimary(self):
Aurel's avatar
Aurel committed
109 110 111 112
        """Find a primary master node, and connect to it.

        If a primary master node is not elected or ready, repeat
        the attempt of a connection periodically.
Aurel's avatar
Aurel committed
113

Aurel's avatar
Aurel committed
114
        Note that I do not accept any connection from non-master nodes
115 116
        at this stage.
        """
117
        self.cluster_state = None
118
        # search, find, connect and identify to the primary master
119 120 121
        bootstrap = BootstrapManager(self, NodeTypes.ADMIN, self.server)
        self.master_node, self.master_conn, num_partitions, num_replicas = \
            bootstrap.getPrimaryConnection()
122

123
        if self.pt is None:
124
            self.pt = PartitionTable(num_partitions, num_replicas)
125
        elif self.pt.getPartitions() != num_partitions:
126
            # XXX: shouldn't we recover instead of raising ?
127
            raise RuntimeError('the number of partitions is inconsistent')
128
        elif self.pt.getReplicas() != num_replicas:
129
            # XXX: shouldn't we recover instead of raising ?
130 131 132
            raise RuntimeError('the number of replicas is inconsistent')

        # passive handler
133
        self.master_conn.setHandler(self.master_event_handler)
134
        self.master_conn.ask(Packets.AskClusterState())
135
        self.master_conn.ask(Packets.AskPartitionTable())
Aurel's avatar
Aurel committed
136

137
    def sendPartitionTable(self, conn, min_offset, max_offset, uuid):
138 139 140 141
        # we have a pt
        self.pt.log()
        row_list = []
        if max_offset == 0:
142
            max_offset = self.pt.getPartitions()
143 144 145 146 147
        try:
            for offset in xrange(min_offset, max_offset):
                row = []
                try:
                    for cell in self.pt.getCellList(offset):
148
                        if uuid is None or cell.getUUID() == uuid:
149 150 151 152 153
                            row.append((cell.getUUID(), cell.getState()))
                except TypeError:
                    pass
                row_list.append((offset, row))
        except IndexError:
154
            conn.send(Errors.ProtocolError('invalid partition table offset'))
155 156
        else:
            conn.answer(Packets.AnswerPartitionList(self.pt.getID(), row_list))