handler.py 19.4 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

Aurel's avatar
Aurel committed
18 19 20
import logging

from neo.handler import EventHandler
21
from neo.connection import MTClientConnection
22
from neo import protocol
23
from neo.protocol import Packet, \
24
        MASTER_NODE_TYPE, STORAGE_NODE_TYPE, CLIENT_NODE_TYPE, \
25
        INVALID_UUID, RUNNING_STATE, TEMPORARILY_DOWN_STATE, \
26
        BROKEN_STATE, FEEDING_STATE, DISCARDED_STATE 
Aurel's avatar
Aurel committed
27 28
from neo.node import MasterNode, StorageNode, ClientNode
from neo.pt import PartitionTable
29
from neo.client.exception import NEOStorageError
30
from neo.exception import ElectionFailure
31
from neo.util import dump
32
from neo.handler import identification_required, restrict_node_types
Aurel's avatar
Aurel committed
33 34

from ZODB.TimeStamp import TimeStamp
35
from ZODB.utils import p64
36

37 38

class BaseHandler(EventHandler):
39
    """Base class for client-side EventHandler implementations."""
Aurel's avatar
Aurel committed
40

41
    def __init__(self, app, dispatcher):
Aurel's avatar
Aurel committed
42
        self.app = app
43
        self.dispatcher = dispatcher
44
        super(BaseHandler, self).__init__()
45 46 47 48 49 50 51

    def dispatch(self, conn, packet):
        # Before calling superclass's dispatch method, lock the connection.
        # This covers the case where handler sends a response to received
        # packet.
        conn.lock()
        try:
52
            super(BaseHandler, self).dispatch(conn, packet)
53 54 55
        finally:
            conn.release()

56
    def packetReceived(self, conn, packet):
57
        """Redirect all received packet to dispatcher thread."""
58 59 60 61 62
        queue = self.dispatcher.getQueue(conn, packet)
        if queue is None:
            self.dispatch(conn, packet)
        else:
            queue.put((conn, packet))
63

Yoshinori Okuji's avatar
Yoshinori Okuji committed
64

65
class PrimaryBootstrapHandler(BaseHandler):
66 67 68 69 70
    # Bootstrap handler used when looking for the primary master

    def connectionFailed(self, conn):
        if self.app.primary_master_node is None:
            self.app.primary_master_node = -1
71
        super(PrimaryBootstrapHandler, self).connectionFailed(conn)
72
    
73
    def connectionClosed(self, conn):
74 75
        if self.app.primary_master_node is None:
            self.app.primary_master_node = -1
76
        super(PrimaryBootstrapHandler, self).connectionClosed(conn)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
77

78 79 80
    def peerBroken(self, conn):
        if self.app.primary_master_node is None:
            self.app.primary_master_node = -1
81
        super(PrimaryBootstrapHandler, self).peerBroken(conn)
82 83

    def timeoutExpired(self, conn):
84 85
        if self.app.primary_master_node is None:
            self.app.primary_master_node = -1
86
        super(PrimaryBootstrapHandler, self).timeoutExpired(conn)
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125

    def handleNotReady(self, conn, packet, message):
        self.app.setNodeNotReady()
        
    def handleAcceptNodeIdentification(self, conn, packet, node_type,
                                       uuid, ip_address, port,
                                       num_partitions, num_replicas, your_uuid):
        app = self.app
        node = app.nm.getNodeByServer(conn.getAddress())
        # this must be a master node
        if node_type != MASTER_NODE_TYPE:
            conn.close()
            return
        if conn.getAddress() != (ip_address, port):
            # The server address is different! Then why was
            # the connection successful?
            logging.error('%s:%d is waiting for %s:%d',
                          conn.getAddress()[0], conn.getAddress()[1],
                          ip_address, port)
            app.nm.remove(node)
            conn.close()
            return

        conn.setUUID(uuid)
        node.setUUID(uuid)

        if your_uuid != INVALID_UUID:
            # got an uuid from the primary master
            app.uuid = your_uuid

        # Create partition table if necessary
        if app.pt is None:
            app.pt = PartitionTable(num_partitions, num_replicas)
            app.num_partitions = num_partitions
            app.num_replicas = num_replicas

        # Ask a primary master.
        conn.lock()
        try:
126
            msg_id = conn.ask(protocol.askPrimaryMaster())
Aurel's avatar
Aurel committed
127
            self.dispatcher.register(conn, msg_id, app.local_var.queue)
128 129 130
        finally:
            conn.unlock()

131
    @identification_required
132
    def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid, known_master_list):
133
        uuid = conn.getUUID()
134
        app = self.app
135 136 137 138 139
        node = app.nm.getNodeByUUID(uuid)
        # This must be sent only by master node
        if node.getNodeType() != MASTER_NODE_TYPE:
            conn.close()
            return
Yoshinori Okuji's avatar
Yoshinori Okuji committed
140

141 142 143 144 145 146 147 148 149 150 151 152
        # Register new master nodes.
        for ip_address, port, uuid in known_master_list:
            addr = (ip_address, port)
            n = app.nm.getNodeByServer(addr)
            if n is None:
                n = MasterNode(server = addr)
                app.nm.add(n)
            if uuid != INVALID_UUID:
                # If I don't know the UUID yet, believe what the peer
                # told me at the moment.
                if n.getUUID() is None or n.getUUID() != uuid:
                    n.setUUID(uuid)
153

154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170
        if primary_uuid != INVALID_UUID:
            # The primary master is defined.
            if app.primary_master_node is not None \
                    and app.primary_master_node.getUUID() != primary_uuid:
                # There are multiple primary master nodes. This is
                # dangerous.
                raise ElectionFailure, 'multiple primary master nodes'
            primary_node = app.nm.getNodeByUUID(primary_uuid)
            if primary_node is None:
                # I don't know such a node. Probably this information
                # is old. So ignore it.
                pass
            else:
                if primary_node.getUUID() == primary_uuid:
                    # Whatever the situation is, I trust this master.
                    app.primary_master_node = primary_node

171
    @identification_required
172
    def handleNotifyNodeInformation(self, conn, packet, node_list):
173
        uuid = conn.getUUID()
174
        app = self.app
175 176 177 178 179 180 181 182 183 184 185 186
        nm = app.nm
        node = nm.getNodeByUUID(uuid)
        # This must be sent only by a primary master node.
        # Note that this may be sent before I know that it is
        # a primary master node.
        if node.getNodeType() != MASTER_NODE_TYPE:
            logging.warn('ignoring notify node information from %s',
                         dump(uuid))
            return
        for node_type, ip_address, port, uuid, state in node_list:
            # Register new nodes.
            addr = (ip_address, port)
187 188 189 190 191
            # Try to retrieve it from nm
            n = None
            if uuid != INVALID_UUID:
                n = nm.getNodeByUUID(uuid)
            if n is None:
192
                n = nm.getNodeByServer(addr)
193 194 195 196 197 198 199 200 201 202
                if n is not None and uuid != INVALID_UUID:
                    # node only exists by address, remove it
                    nm.remove(n)
                    n = None
            elif n.getServer() != addr:
                # same uuid but different address, remove it
                nm.remove(n)
                n = None
                 
            if node_type == MASTER_NODE_TYPE:
203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219
                if n is None:
                    n = MasterNode(server = addr)
                    nm.add(n)
                if uuid != INVALID_UUID:
                    # If I don't know the UUID yet, believe what the peer
                    # told me at the moment.
                    if n.getUUID() is None:
                        n.setUUID(uuid)
            elif node_type == STORAGE_NODE_TYPE:
                if uuid == INVALID_UUID:
                    # No interest.
                    continue
                if n is None:
                    n = StorageNode(server = addr, uuid = uuid)
                    nm.add(n)
            elif node_type == CLIENT_NODE_TYPE:
                continue
220

221
            n.setState(state)
222

223
    @identification_required
224
    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
225
        # This handler is in PrimaryBootstrapHandler, since this
226 227 228 229 230 231
        # basicaly is an answer to askPrimaryMaster.
        # Extract from P-NEO-Protocol.Description:
        #  Connection to primary master node (PMN in service state)
        #   CN -> PMN : askPrimaryMaster
        #   PMN -> CN : answerPrimaryMaster containing primary uuid and no
        #               known master list
232 233
        #   PMN -> CN : notifyNodeInformation containing list of all
        #   ASK_STORE_TRANSACTION#   PMN -> CN : sendPartitionTable containing partition table id and
234 235 236
        #               list of rows
        # notifyNodeInformation is valid as asynchrounous event, but
        # sendPartitionTable is only triggered after askPrimaryMaster.
237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258
        uuid = conn.getUUID()
        app = self.app
        nm = app.nm
        pt = app.pt
        node = app.nm.getNodeByUUID(uuid)
        # This must be sent only by primary master node
        if node.getNodeType() != MASTER_NODE_TYPE:
            return

        if app.ptid != ptid:
            app.ptid = ptid
            pt.clear()
        for offset, row in row_list:
            for uuid, state in row:
                node = nm.getNodeByUUID(uuid)
                if node is None:
                    node = StorageNode(uuid = uuid)
                    node.setState(TEMPORARILY_DOWN_STATE)
                    nm.add(node)
                pt.setCell(offset, node, state)


259 260 261 262 263 264 265 266

class PrimaryNotificationsHandler(EventHandler):
    """ Handler that process the notifications from the primary master """

    # For notifications we do not need a dispatcher
    def __init__(self, app):
        self.app = app
        EventHandler.__init__(self)
267 268 269 270 271 272 273 274 275

    def connectionClosed(self, conn):
        logging.critical("connection to primary master node closed")
        # Close connection
        app = self.app
        app.master_conn.close()
        app.master_conn = None
        app.primary_master_node = None
        app.connectToPrimaryMasterNode()
276
        EventHandler.connectionClosed(self, conn)
277 278 279 280

    def timeoutExpired(self, conn):
        logging.critical("connection timeout to primary master node expired")
        self.app.connectToPrimaryMasterNode()
281
        EventHandler.timeoutExpired(self, conn)
282 283 284 285

    def peerBroken(self, conn):
        logging.critical("primary master node is broken")
        self.app.connectToPrimaryMasterNode()
286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307
        EventHandler.peerBroken(self, conn)

    def handleStopOperation(self, conn, packet):
        logging.critical("master node ask to stop operation")

    def handleInvalidateObjects(self, conn, packet, oid_list, tid):
        app = self.app
        app._cache_lock_acquire()
        try:
            # ZODB required a dict with oid as key, so create it
            oids = {}
            for oid in oid_list:
                oids[oid] = tid
                try:
                    del app.mq_cache[oid]
                except KeyError:
                    pass
            db = app.getDB()
            if db is not None:
                db.invalidate(tid, oids)
        finally:
            app._cache_lock_release()
308 309

    def handleNotifyNodeInformation(self, conn, packet, node_list):
310 311 312 313 314 315 316 317
        app = self.app
        nm = app.nm
        for node_type, ip_address, port, uuid, state in node_list:
            # Register new nodes.
            addr = (ip_address, port)

            if node_type == MASTER_NODE_TYPE:
                n = nm.getNodeByServer(addr)
Aurel's avatar
Aurel committed
318 319
                if n is None:
                    n = MasterNode(server = addr)
320
                    nm.add(n)
Aurel's avatar
Aurel committed
321 322 323 324 325
                if uuid != INVALID_UUID:
                    # If I don't know the UUID yet, believe what the peer
                    # told me at the moment.
                    if n.getUUID() is None:
                        n.setUUID(uuid)
326 327 328 329 330 331 332 333
            elif node_type == STORAGE_NODE_TYPE:
                if uuid == INVALID_UUID:
                    # No interest.
                    continue
                n = nm.getNodeByUUID(uuid)
                if n is None:
                    n = StorageNode(server = addr, uuid = uuid)
                    nm.add(n)
Aurel's avatar
Aurel committed
334
                else:
335 336 337
                    n.setServer(addr)
            elif node_type == CLIENT_NODE_TYPE:
                continue
Aurel's avatar
Aurel committed
338

339
            n.setState(state)
Aurel's avatar
Aurel committed
340

341

Aurel's avatar
Aurel committed
342
    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
343 344 345 346 347 348 349 350 351 352 353 354 355 356 357
        app = self.app
        nm = app.nm
        pt = app.pt

        if app.ptid >= ptid:
            # Ignore this packet.
            return
        app.ptid = ptid
        for offset, uuid, state in cell_list:
            node = nm.getNodeByUUID(uuid)
            if node is None:
                node = StorageNode(uuid = uuid)
                if uuid != app.uuid:
                    node.setState(TEMPORARILY_DOWN_STATE)
                nm.add(node)
358
            pt.setCell(offset, node, state)
Aurel's avatar
Aurel committed
359

360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383

class PrimaryAnswersHandler(BaseHandler):
    """ Handle that process expected packets from the primary master """

    def connectionClosed(self, conn):
        logging.critical("connection to primary master node closed")
        # Close connection
        app = self.app
        app.master_conn.close()
        app.master_conn = None
        app.primary_master_node = None
        app.connectToPrimaryMasterNode()
        super(PrimaryAnswersHandler, self).connectionClosed(conn)

    def timeoutExpired(self, conn):
        logging.critical("connection timeout to primary master node expired")
        self.app.connectToPrimaryMasterNode()
        super(PrimaryAnswersHandler, self).timeoutExpired(conn)

    def peerBroken(self, conn):
        logging.critical("primary master node is broken")
        self.app.connectToPrimaryMasterNode()
        super(PrimaryAnswersHandler, self).peerBroken(conn)

384
    def handleAnswerNewTID(self, conn, packet, tid):
385
        app = self.app
386 387 388 389 390 391 392 393 394 395 396
        app.setTID(tid)

    def handleAnswerNewOIDs(self, conn, packet, oid_list):
        app = self.app
        app.new_oid_list = oid_list
        app.new_oid_list.reverse()

    def handleNotifyTransactionFinished(self, conn, packet, tid):
        app = self.app
        if tid == app.getTID():
            app.setTransactionFinished()
Aurel's avatar
Aurel committed
397

398

399
class StorageBaseHandler(BaseHandler):
400

401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427

    def _dealWithStorageFailure(self, conn, node, state):
        app = self.app

        # Remove from pool connection
        app.cp.removeConnection(node)

        # Put fake packets to task queues.
        queue_set = set()
        for key in self.dispatcher.message_table.keys():
            if id(conn) == key[0]:
                queue = self.dispatcher.message_table.pop(key)
                queue_set.add(queue)
        for queue in queue_set:
            queue.put((conn, None))

        # Notify the primary master node of the failure.
        conn = app.master_conn
        if conn is not None:
            conn.lock()
            try:
                ip_address, port = node.getServer()
                node_list = [(STORAGE_NODE_TYPE, ip_address, port, 
                              node.getUUID(), state)]
                conn.notify(protocol.notifyNodeInformation(node_list))
            finally:
                conn.unlock()
428 429 430 431 432

    def connectionClosed(self, conn):
        node = self.app.nm.getNodeByServer(conn.getAddress())
        logging.info("connection to storage node %s closed", node.getServer())
        self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
433
        super(StorageBaseHandler, self).connectionClosed(conn)
434 435 436 437

    def timeoutExpired(self, conn):
        node = self.app.nm.getNodeByServer(conn.getAddress())
        self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
438
        super(StorageBaseHandler, self).timeoutExpired(conn)
439 440 441 442

    def peerBroken(self, conn):
        node = self.app.nm.getNodeByServer(conn.getAddress())
        self._dealWithStorageFailure(conn, node, BROKEN_STATE)
443 444 445 446 447 448 449 450 451 452 453
        super(StorageBaseHandler, self).peerBroken(conn)


class StorageBootstrapHandler(StorageBaseHandler):
    """ Handler used when connecting to a storage node """

    def connectionFailed(self, conn):
        # Connection to a storage node failed
        node = self.app.nm.getNodeByServer(conn.getAddress())
        self._dealWithStorageFailure(conn, node, TEMPORARILY_DOWN_STATE)
        super(StorageBootstrapHandler, self).connectionFailed(conn)
454 455 456

    def handleNotReady(self, conn, packet, message):
        app = self.app
457
        app.setNodeNotReady()
458
        
459
    def handleAcceptNodeIdentification(self, conn, packet, node_type,
460
           uuid, ip_address, port, num_partitions, num_replicas, your_uuid):
461 462 463
        app = self.app
        node = app.nm.getNodeByServer(conn.getAddress())
        # It can be eiter a master node or a storage node
464
        if node_type != STORAGE_NODE_TYPE:
465 466 467 468 469 470
            conn.close()
            return
        if conn.getAddress() != (ip_address, port):
            # The server address is different! Then why was
            # the connection successful?
            logging.error('%s:%d is waiting for %s:%d',
471
                  conn.getAddress()[0], conn.getAddress()[1], ip_address, port)
472 473 474 475 476 477 478 479
            app.nm.remove(node)
            conn.close()
            return

        conn.setUUID(uuid)
        node.setUUID(uuid)


480 481
class StorageAnswersHandler(StorageBaseHandler):
    """ Handle all messages related to ZODB operations """
482
        
483 484
    def handleAnswerObject(self, conn, packet, oid, start_serial, end_serial, 
            compression, checksum, data):
485 486 487
        app = self.app
        app.local_var.asked_object = (oid, start_serial, end_serial, compression,
                                      checksum, data)
Aurel's avatar
Aurel committed
488

489
    def handleAnswerStoreObject(self, conn, packet, conflicting, oid, serial):
490 491
        app = self.app
        if conflicting:
492
            app.local_var.object_stored = -1, serial
Aurel's avatar
Aurel committed
493
        else:
494
            app.local_var.object_stored = oid, serial
Aurel's avatar
Aurel committed
495

496
    def handleAnswerStoreTransaction(self, conn, packet, tid):
497
        app = self.app
498
        app.setTransactionVoted()
Aurel's avatar
Aurel committed
499

Aurel's avatar
Aurel committed
500
    def handleAnswerTransactionInformation(self, conn, packet, tid,
501
                                           user, desc, ext, oid_list):
502 503 504 505 506 507 508 509 510
        app = self.app
        # transaction information are returned as a dict
        info = {}
        info['time'] = TimeStamp(tid).timeTime()
        info['user_name'] = user
        info['description'] = desc
        info['id'] = tid
        info['oids'] = oid_list
        app.local_var.txn_info = info
Aurel's avatar
Aurel committed
511

512
    def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
513 514 515
        app = self.app
        # history_list is a list of tuple (serial, size)
        app.local_var.history = oid, history_list
Aurel's avatar
Aurel committed
516

517
    def handleOidNotFound(self, conn, packet, message):
518 519 520 521 522 523
        app = self.app
        # This can happen either when :
        # - loading an object
        # - asking for history
        app.local_var.asked_object = -1
        app.local_var.history = -1
524 525

    def handleTidNotFound(self, conn, packet, message):
526 527 528
        app = self.app
        # This can happen when requiring txn informations
        app.local_var.txn_info = -1
529

530
    def handleAnswerTIDs(self, conn, packet, tid_list):
531 532
        app = self.app
        app.local_var.node_tids[conn.getUUID()] = tid_list
533

534