handler.py 15.4 KB
Newer Older
Aurel's avatar
Aurel committed
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
#
# Copyright (C) 2006-2009  Nexedi SA
# 
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

Yoshinori Okuji's avatar
Yoshinori Okuji committed
18 19
import logging

20
from neo import protocol
21
from neo.protocol import Packet, PacketMalformedError
Yoshinori Okuji's avatar
Yoshinori Okuji committed
22
from neo.connection import ServerConnection
Yoshinori Okuji's avatar
Yoshinori Okuji committed
23 24 25 26

from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
        PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
        REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
27 28
        STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
        ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
29
        ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
30
        ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
Yoshinori Okuji's avatar
Yoshinori Okuji committed
31 32 33
        DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_NEW_TID, ANSWER_NEW_TID, \
        FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
        NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
34 35
        ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
        ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
36
        ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
37
        ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
38
        ASK_OIDS, ANSWER_OIDS, \
Yoshinori Okuji's avatar
Yoshinori Okuji committed
39 40 41 42 43 44 45 46 47 48 49 50
        NOT_READY_CODE, OID_NOT_FOUND_CODE, SERIAL_NOT_FOUND_CODE, TID_NOT_FOUND_CODE, \
        PROTOCOL_ERROR_CODE, TIMEOUT_ERROR_CODE, BROKEN_NODE_DISALLOWED_CODE, \
        INTERNAL_ERROR_CODE

class EventHandler(object):
    """This class handles events."""
    def __init__(self):
        self.initPacketDispatchTable()
        self.initErrorDispatchTable()

    def connectionStarted(self, conn):
        """Called when a connection is started."""
51
        logging.debug('connection started for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
52 53 54

    def connectionCompleted(self, conn):
        """Called when a connection is completed."""
55
        logging.debug('connection completed for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
56 57 58

    def connectionFailed(self, conn):
        """Called when a connection failed."""
59
        logging.debug('connection failed for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
60

61
    def connectionAccepted(self, conn, connector, addr):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
62
        """Called when a connection is accepted."""
63
        logging.debug('connection accepted from %s:%d', *addr)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
64
        new_conn = ServerConnection(conn.getEventManager(), conn.getHandler(),
65
                                    connector = connector, addr = addr)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
66 67 68 69 70
        # A request for a node identification should arrive.
        new_conn.expectMessage(timeout = 10, additional_timeout = 0)

    def timeoutExpired(self, conn):
        """Called when a timeout event occurs."""
71
        logging.debug('timeout expired for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
72 73 74

    def connectionClosed(self, conn):
        """Called when a connection is closed by the peer."""
75
        logging.debug('connection closed for %s:%d', *(conn.getAddress()))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
76 77 78 79 80 81 82

    def packetReceived(self, conn, packet):
        """Called when a packet is received."""
        self.dispatch(conn, packet)

    def packetMalformed(self, conn, packet, error_message):
        """Called when a packet is malformed."""
83 84 85
        logging.info('malformed packet %x from %s:%d: %s',
                     packet.getType(), conn.getAddress()[0], 
                     conn.getAddress()[1], error_message)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
86
        conn.notify(protocol.protocolError(error_message))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
87 88 89 90 91 92 93 94 95 96 97 98
        conn.abort()
        self.peerBroken(conn)

    def peerBroken(self, conn):
        """Called when a peer is broken."""
        logging.error('%s:%d is broken', *(conn.getAddress()))

    def dispatch(self, conn, packet):
        """This is a helper method to handle various packet types."""
        t = packet.getType()
        try:
            method = self.packet_dispatch_table[t]
99
            args = packet.decode() or ()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
100
            method(conn, packet, *args)
101
        except (KeyError, ValueError):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
102
            self.handleUnexpectedPacket(conn, packet)
103
        except PacketMalformedError, m:
Yoshinori Okuji's avatar
Yoshinori Okuji committed
104 105 106 107 108
            self.packetMalformed(conn, packet, m[1])

    def handleUnexpectedPacket(self, conn, packet, message = None):
        """Handle an unexpected packet."""
        if message is None:
109
            message = 'unexpected packet type %s' % packet.getType()
Yoshinori Okuji's avatar
Yoshinori Okuji committed
110 111 112
        else:
            message = 'unexpected packet: ' + message
        logging.info('%s', message)
Grégory Wisniewski's avatar
Grégory Wisniewski committed
113
        conn.notify(protocol.protocolError(message))
Yoshinori Okuji's avatar
Yoshinori Okuji committed
114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
        conn.abort()
        self.peerBroken(conn)

    # Packet handlers.

    def handleError(self, conn, packet, code, message):
        try:
            method = self.error_dispatch_table[code]
            method(conn, packet, message)
        except ValueError:
            self.handleUnexpectedPacket(conn, packet, message)

    def handleRequestNodeIdentification(self, conn, packet, node_type,
                                        uuid, ip_address, port, name):
        self.handleUnexpectedPacket(conn, packet)

    def handleAcceptNodeIdentification(self, conn, packet, node_type,
Aurel's avatar
Aurel committed
131
                                       uuid, ip_address, port,
132
                                       num_partitions, num_replicas, your_uuid):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
133 134 135 136
        self.handleUnexpectedPacket(conn, packet)

    def handlePing(self, conn, packet):
        logging.info('got a ping packet; am I overloaded?')
137
        conn.answer(protocol.pong(), packet)
Yoshinori Okuji's avatar
Yoshinori Okuji committed
138 139 140 141

    def handlePong(self, conn, packet):
        pass

142
    def handleAskPrimaryMaster(self, conn, packet):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
143 144
        self.handleUnexpectedPacket(conn, packet)

145 146
    def handleAnswerPrimaryMaster(self, conn, packet, primary_uuid,
                                  known_master_list):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
147 148 149 150 151 152 153 154 155 156 157
        self.handleUnexpectedPacket(conn, packet)

    def handleAnnouncePrimaryMaster(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleReelectPrimaryMaster(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleNotifyNodeInformation(self, conn, packet, node_list):
        self.handleUnexpectedPacket(conn, packet)

158 159 160 161 162 163 164 165 166
    def handleAskLastIDs(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerLastIDs(self, conn, packet, loid, ltid, lptid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAskPartitionTable(self, conn, packet, offset_list):
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
167
    def handleAnswerPartitionTable(self, conn, packet, ptid, row_list):
168 169
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
170
    def handleSendPartitionTable(self, conn, packet, ptid, row_list):
171 172
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
173
    def handleNotifyPartitionChanges(self, conn, packet, ptid, cell_list):
174 175 176 177 178 179 180 181
        self.handleUnexpectedPacket(conn, packet)

    def handleStartOperation(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleStopOperation(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199
    def handleAskUnfinishedTransactions(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerUnfinishedTransactions(self, conn, packet, tid_list):
        self.handleUnexpectedPacket(conn, packet)

    def handleAskObjectPresent(self, conn, packet, oid, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerObjectPresent(self, conn, packet, oid, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleDeleteTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleCommitTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
200 201 202 203 204 205
    def handleAskNewTID(self, conn, packet):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerNewTID(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

206
    def handleAskNewOIDs(self, conn, packet):
Aurel's avatar
Aurel committed
207 208
        self.handleUnexpectedPacket(conn, packet)

209
    def handleAnswerNewOIDs(self, conn, packet, oid_list):
Aurel's avatar
Aurel committed
210 211
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
212 213 214 215 216 217 218 219 220 221 222 223
    def handleFinishTransaction(self, conn, packet, oid_list, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleNotifyTransactionFinished(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleLockInformation(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleNotifyInformationLocked(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

224
    def handleInvalidateObjects(self, conn, packet, oid_list, tid):
Yoshinori Okuji's avatar
Yoshinori Okuji committed
225 226 227 228 229
        self.handleUnexpectedPacket(conn, packet)

    def handleUnlockInformation(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

Aurel's avatar
Aurel committed
230
    def handleAskStoreObject(self, conn, packet, oid, serial,
231
                             compression, checksum, data, tid):
Aurel's avatar
Aurel committed
232 233 234 235 236 237 238 239 240 241 242 243 244 245 246
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerStoreObject(self, conn, packet, status, oid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAbortTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

    def handleAskStoreTransaction(self, conn, packet, tid, user, desc,
                                  ext, oid_list):
        self.handleUnexpectedPacket(conn, packet)

    def handleAnswerStoreTransaction(self, conn, packet, tid):
        self.handleUnexpectedPacket(conn, packet)

247
    def handleAskObject(self, conn, packet, oid, serial, tid):
Aurel's avatar
Aurel committed
248 249
        self.handleUnexpectedPacket(conn, packet)

250 251
    def handleAnswerObject(self, conn, packet, oid, serial_start,
                           serial_end, compression, checksum, data):
Aurel's avatar
Aurel committed
252
        self.handleUnexpectedPacket(conn, packet)
253

254
    def handleAskTIDs(self, conn, packet, first, last, partition):
255
        self.handleUnexpectedPacket(conn, packet)
256 257

    def handleAnswerTIDs(self, conn, packet, tid_list):
258
        self.handleUnexpectedPacket(conn, packet)
259 260

    def handleAskTransactionInformation(self, conn, packet, tid):
261
        self.handleUnexpectedPacket(conn, packet)
262

263 264
    def handleAnswerTransactionInformation(self, conn, packet, tid, 
                                           user, desc, ext, oid_list):
265
        self.handleUnexpectedPacket(conn, packet)
266

267
    def handleAskObjectHistory(self, conn, packet, oid, first, last):
268 269
        self.handleUnexpectedPacket(conn, packet)

270
    def handleAnswerObjectHistory(self, conn, packet, oid, history_list):
271
        self.handleUnexpectedPacket(conn, packet)
272

273 274 275
    def handleAskOIDs(self, conn, packet, first, last, partition):
        self.handleUnexpectedPacket(conn, packet)

276
    def handleAnswerOIDs(self, conn, packet, oid_list):
277 278
        self.handleUnexpectedPacket(conn, packet)

Yoshinori Okuji's avatar
Yoshinori Okuji committed
279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311
    # Error packet handlers.

    handleNotReady = handleUnexpectedPacket
    handleOidNotFound = handleUnexpectedPacket
    handleSerialNotFound = handleUnexpectedPacket
    handleTidNotFound = handleUnexpectedPacket

    def handleProtocolError(self, conn, packet, message):
        raise RuntimeError, 'protocol error: %s' % (message,)

    def handleTimeoutError(self, conn, packet, message):
        raise RuntimeError, 'timeout error: %s' % (message,)

    def handleBrokenNodeDisallowedError(self, conn, packet, message):
        raise RuntimeError, 'broken node disallowed error: %s' % (message,)

    def handleInternalError(self, conn, packet, message):
        self.peerBroken(conn)
        conn.close()

    def initPacketDispatchTable(self):
        d = {}

        d[ERROR] = self.handleError
        d[REQUEST_NODE_IDENTIFICATION] = self.handleRequestNodeIdentification
        d[ACCEPT_NODE_IDENTIFICATION] = self.handleAcceptNodeIdentification
        d[PING] = self.handlePing
        d[PONG] = self.handlePong
        d[ASK_PRIMARY_MASTER] = self.handleAskPrimaryMaster
        d[ANSWER_PRIMARY_MASTER] = self.handleAnswerPrimaryMaster
        d[ANNOUNCE_PRIMARY_MASTER] = self.handleAnnouncePrimaryMaster
        d[REELECT_PRIMARY_MASTER] = self.handleReelectPrimaryMaster
        d[NOTIFY_NODE_INFORMATION] = self.handleNotifyNodeInformation
312 313 314 315 316 317 318 319
        d[ASK_LAST_IDS] = self.handleAskLastIDs
        d[ANSWER_LAST_IDS] = self.handleAnswerLastIDs
        d[ASK_PARTITION_TABLE] = self.handleAskPartitionTable
        d[ANSWER_PARTITION_TABLE] = self.handleAnswerPartitionTable
        d[SEND_PARTITION_TABLE] = self.handleSendPartitionTable
        d[NOTIFY_PARTITION_CHANGES] = self.handleNotifyPartitionChanges
        d[START_OPERATION] = self.handleStartOperation
        d[STOP_OPERATION] = self.handleStopOperation
320 321 322 323 324 325
        d[ASK_UNFINISHED_TRANSACTIONS] = self.handleAskUnfinishedTransactions
        d[ANSWER_UNFINISHED_TRANSACTIONS] = self.handleAnswerUnfinishedTransactions
        d[ASK_OBJECT_PRESENT] = self.handleAskObjectPresent
        d[ANSWER_OBJECT_PRESENT] = self.handleAnswerObjectPresent
        d[DELETE_TRANSACTION] = self.handleDeleteTransaction
        d[COMMIT_TRANSACTION] = self.handleCommitTransaction
Yoshinori Okuji's avatar
Yoshinori Okuji committed
326 327 328 329 330 331 332 333
        d[ASK_NEW_TID] = self.handleAskNewTID
        d[ANSWER_NEW_TID] = self.handleAnswerNewTID
        d[FINISH_TRANSACTION] = self.handleFinishTransaction
        d[NOTIFY_TRANSACTION_FINISHED] = self.handleNotifyTransactionFinished
        d[LOCK_INFORMATION] = self.handleLockInformation
        d[NOTIFY_INFORMATION_LOCKED] = self.handleNotifyInformationLocked
        d[INVALIDATE_OBJECTS] = self.handleInvalidateObjects
        d[UNLOCK_INFORMATION] = self.handleUnlockInformation
334 335
        d[ASK_NEW_OIDS] = self.handleAskNewOIDs
        d[ANSWER_NEW_OIDS] = self.handleAnswerNewOIDs
336 337
        d[ASK_STORE_OBJECT] = self.handleAskStoreObject
        d[ANSWER_STORE_OBJECT] = self.handleAnswerStoreObject
338
        d[ABORT_TRANSACTION] = self.handleAbortTransaction
339 340
        d[ASK_STORE_TRANSACTION] = self.handleAskStoreTransaction
        d[ANSWER_STORE_TRANSACTION] = self.handleAnswerStoreTransaction
341 342
        d[ASK_OBJECT] = self.handleAskObject
        d[ANSWER_OBJECT] = self.handleAnswerObject
343 344 345 346 347 348
        d[ASK_TIDS] = self.handleAskTIDs
        d[ANSWER_TIDS] = self.handleAnswerTIDs
        d[ASK_TRANSACTION_INFORMATION] = self.handleAskTransactionInformation
        d[ANSWER_TRANSACTION_INFORMATION] = self.handleAnswerTransactionInformation
        d[ASK_OBJECT_HISTORY] = self.handleAskObjectHistory
        d[ANSWER_OBJECT_HISTORY] = self.handleAnswerObjectHistory
349 350
        d[ASK_OIDS] = self.handleAskOIDs
        d[ANSWER_OIDS] = self.handleAnswerOIDs
351

Yoshinori Okuji's avatar
Yoshinori Okuji committed
352 353 354 355 356 357 358 359 360 361 362 363 364 365 366
        self.packet_dispatch_table = d

    def initErrorDispatchTable(self):
        d = {}

        d[NOT_READY_CODE] = self.handleNotReady
        d[OID_NOT_FOUND_CODE] = self.handleOidNotFound
        d[SERIAL_NOT_FOUND_CODE] = self.handleSerialNotFound
        d[TID_NOT_FOUND_CODE] = self.handleTidNotFound
        d[PROTOCOL_ERROR_CODE] = self.handleProtocolError
        d[TIMEOUT_ERROR_CODE] = self.handleTimeoutError
        d[BROKEN_NODE_DISALLOWED_CODE] = self.handleBrokenNodeDisallowedError
        d[INTERNAL_ERROR_CODE] = self.handleInternalError

        self.error_dispatch_table = d