node.py 15.5 KB
Newer Older
Aurel's avatar
Aurel committed
1
#
Grégory Wisniewski's avatar
Grégory Wisniewski committed
2
# Copyright (C) 2006-2010  Nexedi SA
3
#
Aurel's avatar
Aurel committed
4 5 6 7
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
8
#
Aurel's avatar
Aurel committed
9 10 11 12 13 14 15
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
16
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
Aurel's avatar
Aurel committed
17

18 19
from time import time

20
from neo import logging
Yoshinori Okuji's avatar
Yoshinori Okuji committed
21
from neo.util import dump
22
from neo.protocol import NodeTypes, NodeStates
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23

24 25
from neo import attributeTracker

Yoshinori Okuji's avatar
Yoshinori Okuji committed
26 27 28
class Node(object):
    """This class represents a node."""

29
    def __init__(self, manager, address=None, uuid=None,
30
            state=NodeStates.UNKNOWN):
31 32 33 34 35
        self._state = state
        self._address = address
        self._uuid = uuid
        self._manager = manager
        self._last_state_change = time()
36
        self._connection = None
37
        manager.add(self)
38

39 40 41 42
    def notify(self, packet):
        assert self.isConnected(), 'Not connected'
        self._connection.notify(packet)

43
    def ask(self, packet, *args, **kw):
44
        assert self.isConnected(), 'Not connected'
45
        self._connection.ask(packet, *args, **kw)
46 47 48 49 50

    def answer(self, packet, msg_id=None):
        assert self.isConnected(), 'Not connected'
        self._connection.answer(packet, msg_id)

51
    def getLastStateChange(self):
52
        return self._last_state_change
Yoshinori Okuji's avatar
Yoshinori Okuji committed
53 54

    def getState(self):
55
        return self._state
Yoshinori Okuji's avatar
Yoshinori Okuji committed
56

57
    def setState(self, new_state):
58 59 60 61 62 63
        if self._state == new_state:
            return
        old_state = self._state
        self._state = new_state
        self._last_state_change = time()
        self._manager._updateState(self, old_state)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
64

65
    def setAddress(self, address):
66 67
        if self._address == address:
            return
68 69 70
        old_address = self._address
        self._address = address
        self._manager._updateAddress(self, old_address)
71

72 73
    def getAddress(self):
        return self._address
74 75

    def setUUID(self, uuid):
76 77
        if self._uuid == uuid:
            return
78 79 80
        old_uuid = self._uuid
        self._uuid = uuid
        self._manager._updateUUID(self, old_uuid)
81
        self._manager._updateIdentified(self)
82 83

    def getUUID(self):
84
        return self._uuid
85

86
    def onConnectionClosed(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
87 88 89
        """
            Callback from node's connection when closed
        """
90 91
        assert self._connection is not None
        self._connection = None
92
        self._manager._updateIdentified(self)
93

94
    def setConnection(self, connection):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
95 96 97
        """
            Define the connection that is currently available to this node.
        """
98 99
        assert connection is not None
        assert self._connection is None
100
        self._connection = connection
101
        connection.setOnClose(self.onConnectionClosed)
102
        self._manager._updateIdentified(self)
103 104

    def getConnection(self):
Grégory Wisniewski's avatar
Grégory Wisniewski committed
105 106 107
        """
            Returns the connection to the node if available
        """
108 109 110 111 112 113 114 115 116 117 118 119 120 121 122
        assert self._connection is not None
        return self._connection

    def isConnected(self):
        """
            Returns True is a connection is established with the node
        """
        return self._connection is not None

    def isIdentified(self):
        """
            Returns True is the node is connected and identified
        """
        return self._connection is not None and self._uuid is not None

123
    def __repr__(self):
124
        return '<%s(uuid=%s, address=%s, state=%s) at %x>' % (
125
            self.__class__.__name__,
126
            dump(self._uuid),
127
            self._address,
128
            self._state,
129
            id(self),
130
        )
131

132
    def isMaster(self):
133
        return False
134 135

    def isStorage(self):
136
        return False
137 138

    def isClient(self):
139
        return False
140 141

    def isAdmin(self):
142
        return False
143

144
    def isRunning(self):
145
        return self._state == NodeStates.RUNNING
146

147 148 149
    def isUnknown(self):
        return self._state == NodeStates.UNKNOWN

150
    def isTemporarilyDown(self):
151
        return self._state == NodeStates.TEMPORARILY_DOWN
152 153

    def isDown(self):
154
        return self._state == NodeStates.DOWN
155

156
    def isBroken(self):
157
        return self._state == NodeStates.BROKEN
158 159

    def isHidden(self):
160
        return self._state == NodeStates.HIDDEN
161 162

    def isPending(self):
163
        return self._state == NodeStates.PENDING
164 165

    def setRunning(self):
166
        self.setState(NodeStates.RUNNING)
167

168 169 170
    def setUnknown(self):
        self.setState(NodeStates.UNKNOWN)

171
    def setTemporarilyDown(self):
172
        self.setState(NodeStates.TEMPORARILY_DOWN)
173 174

    def setDown(self):
175
        self.setState(NodeStates.DOWN)
176 177

    def setBroken(self):
178
        self.setState(NodeStates.BROKEN)
179 180

    def setHidden(self):
181
        self.setState(NodeStates.HIDDEN)
182 183

    def setPending(self):
184
        self.setState(NodeStates.PENDING)
185

186 187 188 189
    def asTuple(self):
        """ Returned tuple is intented to be used in procotol encoders """
        return (self.getType(), self._address, self._uuid, self._state)

190 191 192 193 194 195
    def __gt__(self, node):
        # sort per UUID if defined
        if self._uuid is not None:
            return self._uuid > node._uuid
        return self._address > node._address

196 197 198 199 200 201
    def getType(self):
        try:
            return NODE_CLASS_MAPPING[self.__class__]
        except KeyError:
            raise NotImplementedError

202 203 204 205 206 207 208 209
    def whoSetState(self):
        """
          Debugging method: call this method to know who set the current
          state value.
        """
        return attributeTracker.whoSet(self, '_state')

attributeTracker.track(Node)
210

Yoshinori Okuji's avatar
Yoshinori Okuji committed
211 212
class MasterNode(Node):
    """This class represents a master node."""
213 214 215

    def isMaster(self):
        return True
Yoshinori Okuji's avatar
Yoshinori Okuji committed
216 217 218

class StorageNode(Node):
    """This class represents a storage node."""
219

220 221
    def isStorage(self):
        return True
Yoshinori Okuji's avatar
Yoshinori Okuji committed
222 223 224

class ClientNode(Node):
    """This class represents a client node."""
225

226 227
    def isClient(self):
        return True
Yoshinori Okuji's avatar
Yoshinori Okuji committed
228

Aurel's avatar
Aurel committed
229 230
class AdminNode(Node):
    """This class represents an admin node."""
231

232 233
    def isAdmin(self):
        return True
Aurel's avatar
Aurel committed
234

235

236
NODE_TYPE_MAPPING = {
237 238 239 240
    NodeTypes.MASTER: MasterNode,
    NodeTypes.STORAGE: StorageNode,
    NodeTypes.CLIENT: ClientNode,
    NodeTypes.ADMIN: AdminNode,
241
}
242
NODE_CLASS_MAPPING = {
243 244 245 246
    StorageNode: NodeTypes.STORAGE,
    MasterNode: NodeTypes.MASTER,
    ClientNode: NodeTypes.CLIENT,
    AdminNode: NodeTypes.ADMIN,
247
}
248

Yoshinori Okuji's avatar
Yoshinori Okuji committed
249 250 251
class NodeManager(object):
    """This class manages node status."""

252 253 254 255
    # TODO: rework getXXXList() methods, filter first by node type
    # - getStorageList(identified=True, connected=True, )
    # - getList(...)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
256
    def __init__(self):
257 258 259 260 261
        self._node_set = set()
        self._address_dict = {}
        self._uuid_dict = {}
        self._type_dict = {}
        self._state_dict = {}
262
        self._identified_dict = {}
263 264

    def add(self, node):
265
        if node in self._node_set:
266
            logging.warning('adding a known node %r, ignoring', node)
267
            return
268
        self._node_set.add(node)
269 270 271 272
        self._updateAddress(node, None)
        self._updateUUID(node, None)
        self.__updateSet(self._type_dict, None, node.__class__, node)
        self.__updateSet(self._state_dict, None, node.getState(), node)
273
        self._updateIdentified(node)
274

275
    def remove(self, node):
276 277
        if node not in self._node_set:
            logging.warning('removing unknown node %r, ignoring', node)
278
            return
279 280 281 282 283
        self._node_set.remove(node)
        self.__drop(self._address_dict, node.getAddress())
        self.__drop(self._uuid_dict, node.getUUID())
        self.__dropSet(self._state_dict, node.getState(), node)
        self.__dropSet(self._type_dict, node.__class__, node)
284 285 286
        uuid = node.getUUID()
        if uuid in self._identified_dict:
            del self._identified_dict[uuid]
287

288
    def __drop(self, index_dict, key):
289
        try:
290
            del index_dict[key]
291
        except KeyError:
292 293 294
            # a node may have not be indexed by uuid or address, eg.:
            # - a master known by address but without UUID
            # - a client or admin node that don't have listening address
295 296
            pass

297 298 299
    def __update(self, index_dict, old_key, new_key, node):
        """ Update an index from old to new key """
        if old_key is not None:
300 301 302
            assert index_dict[old_key] is node, '%r is stored as %s, ' \
                'moving %r to %s' % (index_dict[old_key], old_key, node,
                new_key)
303 304 305
            del index_dict[old_key]
        if new_key is not None:
            index_dict[new_key] = node
306

307 308
    def _updateIdentified(self, node):
        uuid = node.getUUID()
309
        identified = node.isIdentified()
310
        if not identified and uuid in self._identified_dict:
311 312
            del self._identified_dict[uuid]
        elif identified:
313 314
            self._identified_dict[uuid] = node

315 316
    def _updateAddress(self, node, old_address):
        self.__update(self._address_dict, old_address, node.getAddress(), node)
317

318 319
    def _updateUUID(self, node, old_uuid):
        self.__update(self._uuid_dict, old_uuid, node.getUUID(), node)
320

321 322 323
    def __dropSet(self, set_dict, key, node):
        if key in set_dict and node in set_dict[key]:
            set_dict[key].remove(node)
324

325 326
    def __updateSet(self, set_dict, old_key, new_key, node):
        """ Update a set index from old to new key """
327
        if old_key in set_dict:
328 329 330
            set_dict[old_key].remove(node)
        if new_key is not None:
            set_dict.setdefault(new_key, set()).add(node)
331

332 333
    def _updateState(self, node, old_state):
        self.__updateSet(self._state_dict, old_state, node.getState(), node)
334

335 336 337 338 339
    def getList(self, node_filter=None):
        if filter is None:
            return list(self._node_set)
        return filter(node_filter, self._node_set)

340 341 342
    def getIdentifiedList(self, pool_set=None):
        """
            Returns a generator to iterate over identified nodes
343
            pool_set is an iterable of UUIDs allowed
344
        """
345 346 347 348
        if pool_set is not None:
            identified_nodes = self._identified_dict.items()
            return [v for k, v in identified_nodes if k in pool_set]
        return list(self._identified_dict.values())
349 350 351 352 353 354 355 356

    def getConnectedList(self):
        """
            Returns a generator to iterate over connected nodes
        """
        # TODO: use an index
        return [x for x in self._node_set if x.isConnected()]

357
    def __getList(self, index_dict, key):
358
        return index_dict.setdefault(key, set())
359 360 361

    def getByStateList(self, state):
        """ Get a node list filtered per the node state """
362
        return list(self.__getList(self._state_dict, state))
363

364 365 366 367 368
    def __getTypeList(self, type_klass, only_identified=False):
        node_set = self.__getList(self._type_dict, type_klass)
        if only_identified:
            return [x for x in node_set if x.getUUID() in self._identified_dict]
        return list(node_set)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
369

370
    def getMasterList(self, only_identified=False):
371
        """ Return a list with master nodes """
372
        return self.__getTypeList(MasterNode, only_identified)
373

374
    def getStorageList(self, only_identified=False):
375
        """ Return a list with storage nodes """
376
        return self.__getTypeList(StorageNode, only_identified)
377

378
    def getClientList(self, only_identified=False):
379
        """ Return a list with client nodes """
380
        return self.__getTypeList(ClientNode, only_identified)
381

382
    def getAdminList(self, only_identified=False):
383
        """ Return a list with admin nodes """
384
        return self.__getTypeList(AdminNode, only_identified)
385 386 387 388 389 390 391 392 393 394

    def getByAddress(self, address):
        """ Return the node that match with a given address """
        return self._address_dict.get(address, None)

    def getByUUID(self, uuid):
        """ Return the node that match with a given UUID """
        return self._uuid_dict.get(uuid, None)

    def hasAddress(self, address):
395
        return address in self._address_dict
396 397

    def hasUUID(self, uuid):
398
        return uuid in self._uuid_dict
399

400
    def _createNode(self, klass, **kw):
401
        return klass(self, **kw)
402

403
    def createMaster(self, **kw):
404
        """ Create and register a new master """
405
        return self._createNode(MasterNode, **kw)
406

407
    def createStorage(self, **kw):
408
        """ Create and register a new storage """
409
        return self._createNode(StorageNode, **kw)
410

411
    def createClient(self, **kw):
412
        """ Create and register a new client """
413
        return self._createNode(ClientNode, **kw)
414

415
    def createAdmin(self, **kw):
416
        """ Create and register a new admin """
417
        return self._createNode(AdminNode, **kw)
418

419
    def _getClassFromNodeType(self, node_type):
420
        klass = NODE_TYPE_MAPPING.get(node_type)
421
        if klass is None:
422
            raise ValueError('Unknown node type : %s' % node_type)
423 424 425 426
        return klass

    def createFromNodeType(self, node_type, **kw):
        return self._createNode(self._getClassFromNodeType(node_type), **kw)
427

428
    def init(self):
429 430 431 432 433
        self._node_set.clear()
        self._type_dict.clear()
        self._state_dict.clear()
        self._uuid_dict.clear()
        self._address_dict.clear()
434

435 436
    def update(self, node_list):
        for node_type, addr, uuid, state in node_list:
437 438 439 440
            # This should be done here (although klass might not be used in this
            # iteration), as it raises if type is not valid.
            klass = self._getClassFromNodeType(node_type)

441
            # lookup in current table
442 443
            node_by_uuid = self.getByUUID(uuid)
            node_by_addr = self.getByAddress(addr)
444 445 446
            node = node_by_uuid or node_by_addr

            log_args = (node_type, dump(uuid), addr, state)
447 448 449 450
            if node is None:
                if state == NodeStates.DOWN:
                    logging.debug('NOT creating node %s %s %s %s', *log_args)
                else:
451 452 453
                    node = self._createNode(klass, address=addr, uuid=uuid,
                            state=state)
                    logging.debug('creating node %r', node)
454
            else:
455 456 457 458 459 460 461
                assert isinstance(node, klass), 'node %r is not ' \
                    'of expected type: %r' % (node, klass)
                assert None in (node_by_uuid, node_by_addr) or \
                    node_by_uuid is node_by_addr, \
                    'Discrepancy between node_by_uuid (%r) and ' \
                    'node_by_addr (%r)' % (node_by_uuid, node_by_addr)
                if state == NodeStates.DOWN:
462 463
                    logging.debug('droping node %r (%r), found with %s %s %s %s',
                        node, node.isConnected(), *log_args)
464 465 466
                    if node.isConnected():
                        # cut this connection, node removed by handler
                        node.getConnection().close()
467
                    self.remove(node)
468 469 470 471 472 473
                else:
                    logging.debug('updating node %r to %s %s %s %s',
                        node, *log_args)
                    node.setUUID(uuid)
                    node.setAddress(addr)
                    node.setState(state)
474
        self.log()
475

476
    def log(self):
477
        logging.info('Node manager : %d nodes' % len(self._node_set))
478 479 480 481 482
        for node in sorted(list(self._node_set)):
            uuid = dump(node.getUUID()) or '-' * 32
            address = node.getAddress() or ''
            if address:
                address = '%s:%d' % address
483
            logging.info(' * %32s | %8s | %22s | %s' % (
484 485
                uuid, node.getType(), address, node.getState()))