Commit ef7eedd3 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Use new Enum class for packet types. Drop the old Enum, this job is done.


git-svn-id: https://svn.erp5.org/repos/neo/trunk@1343 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent cf37fdd3
......@@ -19,7 +19,7 @@ from neo import logging
from neo.locking import RLock
from neo import protocol
from neo.protocol import PacketMalformedError
from neo.protocol import PacketMalformedError, PacketTypes
from neo.event import IdleEvent
from neo.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException, \
......@@ -256,10 +256,10 @@ class Connection(BaseConnection):
try:
packet_type = packet.getType()
if packet_type == protocol.PING:
if packet_type == PacketTypes.PING:
# Send a pong notification
self.answer(protocol.pong(), packet.getId())
elif packet_type != protocol.PONG:
elif packet_type != PacketTypes.PONG:
# Skip PONG packets, its only purpose is to drop IdleEvent
# generated upong ping.
self._queue.append(packet)
......
......@@ -17,29 +17,9 @@
from neo import logging
from neo import protocol
from neo.protocol import NodeStates, ErrorCodes
from neo.protocol import NodeStates, ErrorCodes, PacketTypes
from neo.protocol import PacketMalformedError, UnexpectedPacketError, \
BrokenNodeDisallowedError, NotReadyError, ProtocolError
from protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_BEGIN_TRANSACTION, ANSWER_BEGIN_TRANSACTION, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
ASK_OIDS, ANSWER_OIDS, ADD_PENDING_NODES, ANSWER_NEW_NODES, \
ASK_PARTITION_LIST, ANSWER_PARTITION_LIST, ASK_NODE_LIST, \
ANSWER_NODE_LIST, SET_NODE_STATE, ANSWER_NODE_STATE, SET_CLUSTER_STATE, \
ASK_NODE_INFORMATION, ANSWER_NODE_INFORMATION, \
ASK_CLUSTER_STATE, ANSWER_CLUSTER_STATE, NOTIFY_CLUSTER_INFORMATION, \
NOTIFY_LAST_OID
class EventHandler(object):
......@@ -392,68 +372,68 @@ class EventHandler(object):
def initPacketDispatchTable(self):
d = {}
d[ERROR] = self.handleError
d[REQUEST_NODE_IDENTIFICATION] = self.handleRequestNodeIdentification
d[ACCEPT_NODE_IDENTIFICATION] = self.handleAcceptNodeIdentification
d[ASK_PRIMARY_MASTER] = self.handleAskPrimaryMaster
d[ANSWER_PRIMARY_MASTER] = self.handleAnswerPrimaryMaster
d[ANNOUNCE_PRIMARY_MASTER] = self.handleAnnouncePrimaryMaster
d[REELECT_PRIMARY_MASTER] = self.handleReelectPrimaryMaster
d[NOTIFY_NODE_INFORMATION] = self.handleNotifyNodeInformation
d[ASK_LAST_IDS] = self.handleAskLastIDs
d[ANSWER_LAST_IDS] = self.handleAnswerLastIDs
d[ASK_PARTITION_TABLE] = self.handleAskPartitionTable
d[ANSWER_PARTITION_TABLE] = self.handleAnswerPartitionTable
d[SEND_PARTITION_TABLE] = self.handleSendPartitionTable
d[NOTIFY_PARTITION_CHANGES] = self.handleNotifyPartitionChanges
d[START_OPERATION] = self.handleStartOperation
d[STOP_OPERATION] = self.handleStopOperation
d[ASK_UNFINISHED_TRANSACTIONS] = self.handleAskUnfinishedTransactions
d[ANSWER_UNFINISHED_TRANSACTIONS] = self.handleAnswerUnfinishedTransactions
d[ASK_OBJECT_PRESENT] = self.handleAskObjectPresent
d[ANSWER_OBJECT_PRESENT] = self.handleAnswerObjectPresent
d[DELETE_TRANSACTION] = self.handleDeleteTransaction
d[COMMIT_TRANSACTION] = self.handleCommitTransaction
d[ASK_BEGIN_TRANSACTION] = self.handleAskBeginTransaction
d[ANSWER_BEGIN_TRANSACTION] = self.handleAnswerBeginTransaction
d[FINISH_TRANSACTION] = self.handleFinishTransaction
d[NOTIFY_TRANSACTION_FINISHED] = self.handleNotifyTransactionFinished
d[LOCK_INFORMATION] = self.handleLockInformation
d[NOTIFY_INFORMATION_LOCKED] = self.handleNotifyInformationLocked
d[INVALIDATE_OBJECTS] = self.handleInvalidateObjects
d[UNLOCK_INFORMATION] = self.handleUnlockInformation
d[ASK_NEW_OIDS] = self.handleAskNewOIDs
d[ANSWER_NEW_OIDS] = self.handleAnswerNewOIDs
d[ASK_STORE_OBJECT] = self.handleAskStoreObject
d[ANSWER_STORE_OBJECT] = self.handleAnswerStoreObject
d[ABORT_TRANSACTION] = self.handleAbortTransaction
d[ASK_STORE_TRANSACTION] = self.handleAskStoreTransaction
d[ANSWER_STORE_TRANSACTION] = self.handleAnswerStoreTransaction
d[ASK_OBJECT] = self.handleAskObject
d[ANSWER_OBJECT] = self.handleAnswerObject
d[ASK_TIDS] = self.handleAskTIDs
d[ANSWER_TIDS] = self.handleAnswerTIDs
d[ASK_TRANSACTION_INFORMATION] = self.handleAskTransactionInformation
d[ANSWER_TRANSACTION_INFORMATION] = self.handleAnswerTransactionInformation
d[ASK_OBJECT_HISTORY] = self.handleAskObjectHistory
d[ANSWER_OBJECT_HISTORY] = self.handleAnswerObjectHistory
d[ASK_OIDS] = self.handleAskOIDs
d[ANSWER_OIDS] = self.handleAnswerOIDs
d[ASK_PARTITION_LIST] = self.handleAskPartitionList
d[ANSWER_PARTITION_LIST] = self.handleAnswerPartitionList
d[ASK_NODE_LIST] = self.handleAskNodeList
d[ANSWER_NODE_LIST] = self.handleAnswerNodeList
d[SET_NODE_STATE] = self.handleSetNodeState
d[ANSWER_NODE_STATE] = self.handleAnswerNodeState
d[SET_CLUSTER_STATE] = self.handleSetClusterState
d[ADD_PENDING_NODES] = self.handleAddPendingNodes
d[ANSWER_NEW_NODES] = self.handleAnswerNewNodes
d[ASK_NODE_INFORMATION] = self.handleAskNodeInformation
d[ANSWER_NODE_INFORMATION] = self.handleAnswerNodeInformation
d[ASK_CLUSTER_STATE] = self.handleAskClusterState
d[ANSWER_CLUSTER_STATE] = self.handleAnswerClusterState
d[NOTIFY_CLUSTER_INFORMATION] = self.handleNotifyClusterInformation
d[NOTIFY_LAST_OID] = self.handleNotifyLastOID
d[PacketTypes.ERROR] = self.handleError
d[PacketTypes.REQUEST_NODE_IDENTIFICATION] = self.handleRequestNodeIdentification
d[PacketTypes.ACCEPT_NODE_IDENTIFICATION] = self.handleAcceptNodeIdentification
d[PacketTypes.ASK_PRIMARY_MASTER] = self.handleAskPrimaryMaster
d[PacketTypes.ANSWER_PRIMARY_MASTER] = self.handleAnswerPrimaryMaster
d[PacketTypes.ANNOUNCE_PRIMARY_MASTER] = self.handleAnnouncePrimaryMaster
d[PacketTypes.REELECT_PRIMARY_MASTER] = self.handleReelectPrimaryMaster
d[PacketTypes.NOTIFY_NODE_INFORMATION] = self.handleNotifyNodeInformation
d[PacketTypes.ASK_LAST_IDS] = self.handleAskLastIDs
d[PacketTypes.ANSWER_LAST_IDS] = self.handleAnswerLastIDs
d[PacketTypes.ASK_PARTITION_TABLE] = self.handleAskPartitionTable
d[PacketTypes.ANSWER_PARTITION_TABLE] = self.handleAnswerPartitionTable
d[PacketTypes.SEND_PARTITION_TABLE] = self.handleSendPartitionTable
d[PacketTypes.NOTIFY_PARTITION_CHANGES] = self.handleNotifyPartitionChanges
d[PacketTypes.START_OPERATION] = self.handleStartOperation
d[PacketTypes.STOP_OPERATION] = self.handleStopOperation
d[PacketTypes.ASK_UNFINISHED_TRANSACTIONS] = self.handleAskUnfinishedTransactions
d[PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS] = self.handleAnswerUnfinishedTransactions
d[PacketTypes.ASK_OBJECT_PRESENT] = self.handleAskObjectPresent
d[PacketTypes.ANSWER_OBJECT_PRESENT] = self.handleAnswerObjectPresent
d[PacketTypes.DELETE_TRANSACTION] = self.handleDeleteTransaction
d[PacketTypes.COMMIT_TRANSACTION] = self.handleCommitTransaction
d[PacketTypes.ASK_BEGIN_TRANSACTION] = self.handleAskBeginTransaction
d[PacketTypes.ANSWER_BEGIN_TRANSACTION] = self.handleAnswerBeginTransaction
d[PacketTypes.FINISH_TRANSACTION] = self.handleFinishTransaction
d[PacketTypes.NOTIFY_TRANSACTION_FINISHED] = self.handleNotifyTransactionFinished
d[PacketTypes.LOCK_INFORMATION] = self.handleLockInformation
d[PacketTypes.NOTIFY_INFORMATION_LOCKED] = self.handleNotifyInformationLocked
d[PacketTypes.INVALIDATE_OBJECTS] = self.handleInvalidateObjects
d[PacketTypes.UNLOCK_INFORMATION] = self.handleUnlockInformation
d[PacketTypes.ASK_NEW_OIDS] = self.handleAskNewOIDs
d[PacketTypes.ANSWER_NEW_OIDS] = self.handleAnswerNewOIDs
d[PacketTypes.ASK_STORE_OBJECT] = self.handleAskStoreObject
d[PacketTypes.ANSWER_STORE_OBJECT] = self.handleAnswerStoreObject
d[PacketTypes.ABORT_TRANSACTION] = self.handleAbortTransaction
d[PacketTypes.ASK_STORE_TRANSACTION] = self.handleAskStoreTransaction
d[PacketTypes.ANSWER_STORE_TRANSACTION] = self.handleAnswerStoreTransaction
d[PacketTypes.ASK_OBJECT] = self.handleAskObject
d[PacketTypes.ANSWER_OBJECT] = self.handleAnswerObject
d[PacketTypes.ASK_TIDS] = self.handleAskTIDs
d[PacketTypes.ANSWER_TIDS] = self.handleAnswerTIDs
d[PacketTypes.ASK_TRANSACTION_INFORMATION] = self.handleAskTransactionInformation
d[PacketTypes.ANSWER_TRANSACTION_INFORMATION] = self.handleAnswerTransactionInformation
d[PacketTypes.ASK_OBJECT_HISTORY] = self.handleAskObjectHistory
d[PacketTypes.ANSWER_OBJECT_HISTORY] = self.handleAnswerObjectHistory
d[PacketTypes.ASK_OIDS] = self.handleAskOIDs
d[PacketTypes.ANSWER_OIDS] = self.handleAnswerOIDs
d[PacketTypes.ASK_PARTITION_LIST] = self.handleAskPartitionList
d[PacketTypes.ANSWER_PARTITION_LIST] = self.handleAnswerPartitionList
d[PacketTypes.ASK_NODE_LIST] = self.handleAskNodeList
d[PacketTypes.ANSWER_NODE_LIST] = self.handleAnswerNodeList
d[PacketTypes.SET_NODE_STATE] = self.handleSetNodeState
d[PacketTypes.ANSWER_NODE_STATE] = self.handleAnswerNodeState
d[PacketTypes.SET_CLUSTER_STATE] = self.handleSetClusterState
d[PacketTypes.ADD_PENDING_NODES] = self.handleAddPendingNodes
d[PacketTypes.ANSWER_NEW_NODES] = self.handleAnswerNewNodes
d[PacketTypes.ASK_NODE_INFORMATION] = self.handleAskNodeInformation
d[PacketTypes.ANSWER_NODE_INFORMATION] = self.handleAnswerNodeInformation
d[PacketTypes.ASK_CLUSTER_STATE] = self.handleAskClusterState
d[PacketTypes.ANSWER_CLUSTER_STATE] = self.handleAnswerClusterState
d[PacketTypes.NOTIFY_CLUSTER_INFORMATION] = self.handleNotifyClusterInformation
d[PacketTypes.NOTIFY_LAST_OID] = self.handleNotifyLastOID
return d
......
......@@ -16,7 +16,7 @@
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from neo import logging
from neo import protocol
from neo.protocol import PacketTypes
from neo.util import dump
class PacketLogger(object):
......@@ -243,68 +243,68 @@ class PacketLogger(object):
# Fetch tables initialization
def initFetchTable(self):
d = {}
d[protocol.ERROR] = self.error
d[protocol.REQUEST_NODE_IDENTIFICATION] = self.requestNodeIdentification
d[protocol.ACCEPT_NODE_IDENTIFICATION] = self.acceptNodeIdentification
d[protocol.ASK_PRIMARY_MASTER] = self.askPrimaryMaster
d[protocol.ANSWER_PRIMARY_MASTER] = self.answerPrimaryMaster
d[protocol.ANNOUNCE_PRIMARY_MASTER] = self.announcePrimaryMaster
d[protocol.REELECT_PRIMARY_MASTER] = self.reelectPrimaryMaster
d[protocol.NOTIFY_NODE_INFORMATION] = self.notifyNodeInformation
d[protocol.ASK_LAST_IDS] = self.askLastIDs
d[protocol.ANSWER_LAST_IDS] = self.answerLastIDs
d[protocol.ASK_PARTITION_TABLE] = self.askPartitionTable
d[protocol.ANSWER_PARTITION_TABLE] = self.answerPartitionTable
d[protocol.SEND_PARTITION_TABLE] = self.sendPartitionTable
d[protocol.NOTIFY_PARTITION_CHANGES] = self.notifyPartitionChanges
d[protocol.START_OPERATION] = self.startOperation
d[protocol.STOP_OPERATION] = self.stopOperation
d[protocol.ASK_UNFINISHED_TRANSACTIONS] = self.askUnfinishedTransactions
d[protocol.ANSWER_UNFINISHED_TRANSACTIONS] = self.answerUnfinishedTransactions
d[protocol.ASK_OBJECT_PRESENT] = self.askObjectPresent
d[protocol.ANSWER_OBJECT_PRESENT] = self.answerObjectPresent
d[protocol.DELETE_TRANSACTION] = self.deleteTransaction
d[protocol.COMMIT_TRANSACTION] = self.commitTransaction
d[protocol.ASK_BEGIN_TRANSACTION] = self.askBeginTransaction
d[protocol.ANSWER_BEGIN_TRANSACTION] = self.answerBeginTransaction
d[protocol.FINISH_TRANSACTION] = self.finishTransaction
d[protocol.NOTIFY_TRANSACTION_FINISHED] = self.notifyTransactionFinished
d[protocol.LOCK_INFORMATION] = self.lockInformation
d[protocol.NOTIFY_INFORMATION_LOCKED] = self.notifyInformationLocked
d[protocol.INVALIDATE_OBJECTS] = self.invalidateObjects
d[protocol.UNLOCK_INFORMATION] = self.unlockInformation
d[protocol.ASK_NEW_OIDS] = self.askNewOIDs
d[protocol.ANSWER_NEW_OIDS] = self.answerNewOIDs
d[protocol.ASK_STORE_OBJECT] = self.askStoreObject
d[protocol.ANSWER_STORE_OBJECT] = self.answerStoreObject
d[protocol.ABORT_TRANSACTION] = self.abortTransaction
d[protocol.ASK_STORE_TRANSACTION] = self.askStoreTransaction
d[protocol.ANSWER_STORE_TRANSACTION] = self.answerStoreTransaction
d[protocol.ASK_OBJECT] = self.askObject
d[protocol.ANSWER_OBJECT] = self.answerObject
d[protocol.ASK_TIDS] = self.askTIDs
d[protocol.ANSWER_TIDS] = self.answerTIDs
d[protocol.ASK_TRANSACTION_INFORMATION] = self.askTransactionInformation
d[protocol.ANSWER_TRANSACTION_INFORMATION] = self.answerTransactionInformation
d[protocol.ASK_OBJECT_HISTORY] = self.askObjectHistory
d[protocol.ANSWER_OBJECT_HISTORY] = self.answerObjectHistory
d[protocol.ASK_OIDS] = self.askOIDs
d[protocol.ANSWER_OIDS] = self.answerOIDs
d[protocol.ASK_PARTITION_LIST] = self.askPartitionList
d[protocol.ANSWER_PARTITION_LIST] = self.answerPartitionList
d[protocol.ASK_NODE_LIST] = self.askNodeList
d[protocol.ANSWER_NODE_LIST] = self.answerNodeList
d[protocol.SET_NODE_STATE] = self.setNodeState
d[protocol.ANSWER_NODE_STATE] = self.answerNodeState
d[protocol.SET_CLUSTER_STATE] = self.setClusterState
d[protocol.ADD_PENDING_NODES] = self.addPendingNodes
d[protocol.ANSWER_NEW_NODES] = self.answerNewNodes
d[protocol.ASK_NODE_INFORMATION] = self.askNodeInformation
d[protocol.ANSWER_NODE_INFORMATION] = self.answerNodeInformation
d[protocol.ASK_CLUSTER_STATE] = self.askClusterState
d[protocol.ANSWER_CLUSTER_STATE] = self.answerClusterState
d[protocol.NOTIFY_CLUSTER_INFORMATION] = self.notifyClusterInformation
d[protocol.NOTIFY_LAST_OID] = self.notifyLastOID
d[PacketTypes.ERROR] = self.error
d[PacketTypes.REQUEST_NODE_IDENTIFICATION] = self.requestNodeIdentification
d[PacketTypes.ACCEPT_NODE_IDENTIFICATION] = self.acceptNodeIdentification
d[PacketTypes.ASK_PRIMARY_MASTER] = self.askPrimaryMaster
d[PacketTypes.ANSWER_PRIMARY_MASTER] = self.answerPrimaryMaster
d[PacketTypes.ANNOUNCE_PRIMARY_MASTER] = self.announcePrimaryMaster
d[PacketTypes.REELECT_PRIMARY_MASTER] = self.reelectPrimaryMaster
d[PacketTypes.NOTIFY_NODE_INFORMATION] = self.notifyNodeInformation
d[PacketTypes.ASK_LAST_IDS] = self.askLastIDs
d[PacketTypes.ANSWER_LAST_IDS] = self.answerLastIDs
d[PacketTypes.ASK_PARTITION_TABLE] = self.askPartitionTable
d[PacketTypes.ANSWER_PARTITION_TABLE] = self.answerPartitionTable
d[PacketTypes.SEND_PARTITION_TABLE] = self.sendPartitionTable
d[PacketTypes.NOTIFY_PARTITION_CHANGES] = self.notifyPartitionChanges
d[PacketTypes.START_OPERATION] = self.startOperation
d[PacketTypes.STOP_OPERATION] = self.stopOperation
d[PacketTypes.ASK_UNFINISHED_TRANSACTIONS] = self.askUnfinishedTransactions
d[PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS] = self.answerUnfinishedTransactions
d[PacketTypes.ASK_OBJECT_PRESENT] = self.askObjectPresent
d[PacketTypes.ANSWER_OBJECT_PRESENT] = self.answerObjectPresent
d[PacketTypes.DELETE_TRANSACTION] = self.deleteTransaction
d[PacketTypes.COMMIT_TRANSACTION] = self.commitTransaction
d[PacketTypes.ASK_BEGIN_TRANSACTION] = self.askBeginTransaction
d[PacketTypes.ANSWER_BEGIN_TRANSACTION] = self.answerBeginTransaction
d[PacketTypes.FINISH_TRANSACTION] = self.finishTransaction
d[PacketTypes.NOTIFY_TRANSACTION_FINISHED] = self.notifyTransactionFinished
d[PacketTypes.LOCK_INFORMATION] = self.lockInformation
d[PacketTypes.NOTIFY_INFORMATION_LOCKED] = self.notifyInformationLocked
d[PacketTypes.INVALIDATE_OBJECTS] = self.invalidateObjects
d[PacketTypes.UNLOCK_INFORMATION] = self.unlockInformation
d[PacketTypes.ASK_NEW_OIDS] = self.askNewOIDs
d[PacketTypes.ANSWER_NEW_OIDS] = self.answerNewOIDs
d[PacketTypes.ASK_STORE_OBJECT] = self.askStoreObject
d[PacketTypes.ANSWER_STORE_OBJECT] = self.answerStoreObject
d[PacketTypes.ABORT_TRANSACTION] = self.abortTransaction
d[PacketTypes.ASK_STORE_TRANSACTION] = self.askStoreTransaction
d[PacketTypes.ANSWER_STORE_TRANSACTION] = self.answerStoreTransaction
d[PacketTypes.ASK_OBJECT] = self.askObject
d[PacketTypes.ANSWER_OBJECT] = self.answerObject
d[PacketTypes.ASK_TIDS] = self.askTIDs
d[PacketTypes.ANSWER_TIDS] = self.answerTIDs
d[PacketTypes.ASK_TRANSACTION_INFORMATION] = self.askTransactionInformation
d[PacketTypes.ANSWER_TRANSACTION_INFORMATION] = self.answerTransactionInformation
d[PacketTypes.ASK_OBJECT_HISTORY] = self.askObjectHistory
d[PacketTypes.ANSWER_OBJECT_HISTORY] = self.answerObjectHistory
d[PacketTypes.ASK_OIDS] = self.askOIDs
d[PacketTypes.ANSWER_OIDS] = self.answerOIDs
d[PacketTypes.ASK_PARTITION_LIST] = self.askPartitionList
d[PacketTypes.ANSWER_PARTITION_LIST] = self.answerPartitionList
d[PacketTypes.ASK_NODE_LIST] = self.askNodeList
d[PacketTypes.ANSWER_NODE_LIST] = self.answerNodeList
d[PacketTypes.SET_NODE_STATE] = self.setNodeState
d[PacketTypes.ANSWER_NODE_STATE] = self.answerNodeState
d[PacketTypes.SET_CLUSTER_STATE] = self.setClusterState
d[PacketTypes.ADD_PENDING_NODES] = self.addPendingNodes
d[PacketTypes.ANSWER_NEW_NODES] = self.answerNewNodes
d[PacketTypes.ASK_NODE_INFORMATION] = self.askNodeInformation
d[PacketTypes.ANSWER_NODE_INFORMATION] = self.answerNodeInformation
d[PacketTypes.ASK_CLUSTER_STATE] = self.askClusterState
d[PacketTypes.ANSWER_CLUSTER_STATE] = self.answerClusterState
d[PacketTypes.NOTIFY_CLUSTER_INFORMATION] = self.notifyClusterInformation
d[PacketTypes.NOTIFY_LAST_OID] = self.notifyLastOID
return d
......
......@@ -17,7 +17,6 @@
from neo.handler import EventHandler
from neo.protocol import ErrorCodes
from neo import protocol
class CommandEventHandler(EventHandler):
""" Base handler for command """
......
......@@ -20,7 +20,7 @@ from neo.connection import ClientConnection
from neo.event import EventManager
from neo.neoctl.handler import CommandEventHandler
from neo import protocol
from neo.protocol import ClusterStates, NodeStates, ErrorCodes
from neo.protocol import ClusterStates, NodeStates, ErrorCodes, PacketTypes
class NotReadyException(Exception):
pass
......@@ -59,7 +59,7 @@ class NeoCTL(object):
if not self.connected:
raise NotReadyException, 'Connection closed'
response = response_queue.pop()
if response[0] == protocol.ERROR and \
if response[0] == PacketTypes.ERROR and \
response[1] == ErrorCodes.NOT_READY:
raise NotReadyException(response[2])
return response
......@@ -70,7 +70,7 @@ class NeoCTL(object):
"""
packet = protocol.addPendingNodes(uuid_list)
response = self.__ask(packet)
assert response[0] == protocol.ERROR
assert response[0] == PacketTypes.ERROR
assert response[1] == ErrorCodes.NO_ERROR
def setClusterState(self, state):
......@@ -79,7 +79,7 @@ class NeoCTL(object):
"""
packet = protocol.setClusterState(state)
response = self.__ask(packet)
assert response[0] == protocol.ERROR
assert response[0] == PacketTypes.ERROR
assert response[1] == ErrorCodes.NO_ERROR
def setNodeState(self, node, state, update_partition_table=False):
......@@ -92,7 +92,7 @@ class NeoCTL(object):
update_partition_table = 0
packet = protocol.setNodeState(node, state, update_partition_table)
response = self.__ask(packet)
assert response[0] == protocol.ERROR
assert response[0] == PacketTypes.ERROR
assert response[1] == ErrorCodes.NO_ERROR
def getClusterState(self):
......@@ -101,7 +101,7 @@ class NeoCTL(object):
"""
packet = protocol.askClusterState()
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_CLUSTER_STATE
assert response[0] == PacketTypes.ANSWER_CLUSTER_STATE
return response[1]
def getNodeList(self, node_type=None):
......@@ -110,7 +110,7 @@ class NeoCTL(object):
"""
packet = protocol.askNodeList(node_type)
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_NODE_LIST
assert response[0] == PacketTypes.ANSWER_NODE_LIST
return response[1]
def getPartitionRowList(self, min_offset=0, max_offset=0, node=None):
......@@ -120,7 +120,7 @@ class NeoCTL(object):
"""
packet = protocol.askPartitionList(min_offset, max_offset, node)
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_PARTITION_LIST
assert response[0] == PacketTypes.ANSWER_PARTITION_LIST
return (response[1], response[2])
def startCluster(self):
......@@ -141,6 +141,6 @@ class NeoCTL(object):
"""
packet = protocol.askPrimaryMaster()
response = self.__ask(packet)
assert response[0] == protocol.ANSWER_PRIMARY_MASTER
assert response[0] == PacketTypes.ANSWER_PRIMARY_MASTER
return response[1]
......@@ -20,68 +20,6 @@ from socket import inet_ntoa, inet_aton
from neo.util import Enum
class EnumItem(int):
"""
Enumerated value type.
Not to be used outside of Enum class.
"""
def __new__(cls, enum, name, value):
instance = super(EnumItem, cls).__new__(cls, value)
instance.enum = enum
instance.name = name
return instance
def __eq__(self, other):
"""
Raise if compared type doesn't match.
"""
if not isinstance(other, EnumItem):
if isinstance(other, (int, float, long)):
raise TypeError, 'Comparing an enum with %r.' % (other, )
else:
return False
if other.enum is not self.enum:
raise TypeError, 'Comparing enums of incompatible types: %s ' \
'and %s' % (self, other)
return int(other) == int(self)
def __ne__(self, other):
return not(self == other)
def __str__(self):
return self.name
def __repr__(self):
return '<EnumItem %r (%r)>' % (self.name, int(self))
class OldEnum(dict):
"""
C-style enumerated type support with extended typechecking.
Instantiate with a dict whose keys are variable names and values are
the value of that variable.
Variables are added to module's globals and can be used directly.
The purpose of this class is purely to prevent developper from
mistakenly comparing an enumerated value with a value from another enum,
or even not from any enum at all.
"""
def __init__(self, value_dict):
global_dict = globals()
self.str_enum_dict = str_enum_dict = {}
for key, value in value_dict.iteritems():
# Only integer types are supported. This should be enough, and
# extending support to other types would only make moving to other
# languages harder.
if not isinstance(value, int):
raise TypeError, 'Enum class only support integer values.'
item = EnumItem(self, key, value)
global_dict[key] = self[value] = item
str_enum_dict[key] = item
def getFromStr(self, value, default=None):
return self.str_enum_dict.get(value, default)
# The protocol version (major, minor).
PROTOCOL_VERSION = (4, 0)
......@@ -89,222 +27,222 @@ PROTOCOL_VERSION = (4, 0)
MIN_PACKET_SIZE = 10
MAX_PACKET_SIZE = 0x4000000
PACKET_HEADER_SIZE = 10
# Message types.
packet_types = OldEnum({
# Message types.
class PacketTypes(Enum):
# Error is a special type of message, because this can be sent against any other message,
# even if such a message does not expect a reply usually. Any -> Any.
'ERROR': 0x8000,
ERROR = Enum.Item(0x8000)
# Check if a peer is still alive. Any -> Any.
'PING': 0x0001,
PING = Enum.Item(0x0001)
# Request a node identification. This must be the first packet for any connection.
# Any -> Any.
'REQUEST_NODE_IDENTIFICATION': 0x0002,
REQUEST_NODE_IDENTIFICATION = Enum.Item(0x0002)
# Accept a node identification. This should be a reply to Request Node Identification.
# Any -> Any.
'ACCEPT_NODE_IDENTIFICATION': 0x8002,
ACCEPT_NODE_IDENTIFICATION = Enum.Item(0x8002)
# Ask a current primary master node. This must be the second message when connecting
# to a master node. Any -> M.
'ASK_PRIMARY_MASTER': 0x0003,
ASK_PRIMARY_MASTER = Enum.Item(0x0003)
# Reply to Ask Primary Master. This message includes a list of known master nodes,
# to make sure that a peer has the same information. M -> Any.
'ANSWER_PRIMARY_MASTER': 0x8003,
ANSWER_PRIMARY_MASTER = Enum.Item(0x8003)
# Announce a primary master node election. PM -> SM.
'ANNOUNCE_PRIMARY_MASTER': 0x0004,
ANNOUNCE_PRIMARY_MASTER = Enum.Item(0x0004)
# Force a re-election of a primary master node. M -> M.
'REELECT_PRIMARY_MASTER': 0x0005,
REELECT_PRIMARY_MASTER = Enum.Item(0x0005)
# Notify information about one or more nodes. Any -> PM, PM -> Any.
'NOTIFY_NODE_INFORMATION': 0x0006,
NOTIFY_NODE_INFORMATION = Enum.Item(0x0006)
# Ask the last OID, the last TID and the last Partition Table ID that
# a storage node stores. Used to recover information. PM -> S, S -> PM.
'ASK_LAST_IDS': 0x0007,
ASK_LAST_IDS = Enum.Item(0x0007)
# Reply to Ask Last IDs. S -> PM, PM -> S.
'ANSWER_LAST_IDS': 0x8007,
ANSWER_LAST_IDS = Enum.Item(0x8007)
# Ask rows in a partition table that a storage node stores. Used to recover
# information. PM -> S.
'ASK_PARTITION_TABLE': 0x0008,
ASK_PARTITION_TABLE = Enum.Item(0x0008)
# Answer rows in a partition table. S -> PM.
'ANSWER_PARTITION_TABLE': 0x8008,
ANSWER_PARTITION_TABLE = Enum.Item(0x8008)
# Send rows in a partition table to update other nodes. PM -> S, C.
'SEND_PARTITION_TABLE': 0x0009,
SEND_PARTITION_TABLE = Enum.Item(0x0009)
# Notify a subset of a partition table. This is used to notify changes. PM -> S, C.
'NOTIFY_PARTITION_CHANGES': 0x000a,
NOTIFY_PARTITION_CHANGES = Enum.Item(0x000a)
# Tell a storage nodes to start an operation. Until a storage node receives this
# message, it must not serve client nodes. PM -> S.
'START_OPERATION': 0x000b,
START_OPERATION = Enum.Item(0x000b)
# Tell a storage node to stop an operation. Once a storage node receives this message,
# it must not serve client nodes. PM -> S.
'STOP_OPERATION': 0x000c,
STOP_OPERATION = Enum.Item(0x000c)
# Ask unfinished transactions' IDs. PM -> S.
'ASK_UNFINISHED_TRANSACTIONS': 0x000d,
# Ask unfinished transactions =Enum.Item(s) PM -> S.
ASK_UNFINISHED_TRANSACTIONS = Enum.Item(0x000d)
# Answer unfinished transactions' IDs. S -> PM.
'ANSWER_UNFINISHED_TRANSACTIONS': 0x800d,
# Answer unfinished transactions =Enum.Item(s) S -> PM.
ANSWER_UNFINISHED_TRANSACTIONS = Enum.Item(0x800d)
# Ask if an object is present. If not present, OID_NOT_FOUND should be returned. PM -> S.
'ASK_OBJECT_PRESENT': 0x000f,
ASK_OBJECT_PRESENT = Enum.Item(0x000f)
# Answer that an object is present. PM -> S.
'ANSWER_OBJECT_PRESENT': 0x800f,
ANSWER_OBJECT_PRESENT = Enum.Item(0x800f)
# Delete a transaction. PM -> S.
'DELETE_TRANSACTION': 0x0010,
DELETE_TRANSACTION = Enum.Item(0x0010)
# Commit a transaction. PM -> S.
'COMMIT_TRANSACTION': 0x0011,
COMMIT_TRANSACTION = Enum.Item(0x0011)
# Ask to begin a new transaction. C -> PM.
'ASK_BEGIN_TRANSACTION': 0x0012,
ASK_BEGIN_TRANSACTION = Enum.Item(0x0012)
# Answer when a transaction begin, give a TID if necessary. PM -> C.
'ANSWER_BEGIN_TRANSACTION': 0x8012,
ANSWER_BEGIN_TRANSACTION = Enum.Item(0x8012)
# Finish a transaction. C -> PM.
'FINISH_TRANSACTION': 0x0013,
FINISH_TRANSACTION = Enum.Item(0x0013)
# Notify a transaction finished. PM -> C.
'NOTIFY_TRANSACTION_FINISHED': 0x8013,
NOTIFY_TRANSACTION_FINISHED = Enum.Item(0x8013)
# Lock information on a transaction. PM -> S.
'LOCK_INFORMATION': 0x0014,
LOCK_INFORMATION = Enum.Item(0x0014)
# Notify information on a transaction locked. S -> PM.
'NOTIFY_INFORMATION_LOCKED': 0x8014,
NOTIFY_INFORMATION_LOCKED = Enum.Item(0x8014)
# Invalidate objects. PM -> C.
'INVALIDATE_OBJECTS': 0x0015,
INVALIDATE_OBJECTS = Enum.Item(0x0015)
# Unlock information on a transaction. PM -> S.
'UNLOCK_INFORMATION': 0x0016,
UNLOCK_INFORMATION = Enum.Item(0x0016)
# Ask new object IDs. C -> PM.
'ASK_NEW_OIDS': 0x0017,
ASK_NEW_OIDS = Enum.Item(0x0017)
# Answer new object IDs. PM -> C.
'ANSWER_NEW_OIDS': 0x8017,
ANSWER_NEW_OIDS = Enum.Item(0x8017)
# Ask to store an object. Send an OID, an original serial, a current
# transaction ID, and data. C -> S.
'ASK_STORE_OBJECT': 0x0018,
ASK_STORE_OBJECT = Enum.Item(0x0018)
# Answer if an object has been stored. If an object is in conflict,
# a serial of the conflicting transaction is returned. In this case,
# if this serial is newer than the current transaction ID, a client
# node must not try to resolve the conflict. S -> C.
'ANSWER_STORE_OBJECT': 0x8018,
ANSWER_STORE_OBJECT = Enum.Item(0x8018)
# Abort a transaction. C -> S, PM.
'ABORT_TRANSACTION': 0x0019,
ABORT_TRANSACTION = Enum.Item(0x0019)
# Ask to store a transaction. C -> S.
'ASK_STORE_TRANSACTION': 0x001a,
ASK_STORE_TRANSACTION = Enum.Item(0x001a)
# Answer if transaction has been stored. S -> C.
'ANSWER_STORE_TRANSACTION': 0x801a,
ANSWER_STORE_TRANSACTION = Enum.Item(0x801a)
# Ask a stored object by its OID and a serial or a TID if given. If a serial
# is specified, the specified revision of an object will be returned. If
# a TID is specified, an object right before the TID will be returned. S,C -> S.
'ASK_OBJECT': 0x001b,
ASK_OBJECT = Enum.Item(0x001b)
# Answer the requested object. S -> C.
'ANSWER_OBJECT': 0x801b,
ANSWER_OBJECT = Enum.Item(0x801b)
# Ask for TIDs between a range of offsets. The order of TIDs is descending,
# and the range is [first, last). C, S -> S.
'ASK_TIDS': 0x001d,
ASK_TIDS = Enum.Item(0x001d)
# Answer the requested TIDs. S -> C, S.
'ANSWER_TIDS': 0x801d,
ANSWER_TIDS = Enum.Item(0x801d)
# Ask information about a transaction. Any -> S.
'ASK_TRANSACTION_INFORMATION': 0x001e,
ASK_TRANSACTION_INFORMATION = Enum.Item(0x001e)
# Answer information (user, description) about a transaction. S -> Any.
'ANSWER_TRANSACTION_INFORMATION': 0x801e,
ANSWER_TRANSACTION_INFORMATION = Enum.Item(0x801e)
# Ask history information for a given object. The order of serials is
# descending, and the range is [first, last]. C, S -> S.
'ASK_OBJECT_HISTORY': 0x001f,
ASK_OBJECT_HISTORY = Enum.Item(0x001f)
# Answer history information (serial, size) for an object. S -> C, S.
'ANSWER_OBJECT_HISTORY': 0x801f,
ANSWER_OBJECT_HISTORY = Enum.Item(0x801f)
# Ask for OIDs between a range of offsets. The order of OIDs is descending,
# and the range is [first, last). S -> S.
'ASK_OIDS': 0x0020,
ASK_OIDS = Enum.Item(0x0020)
# Answer the requested OIDs. S -> S.
'ANSWER_OIDS': 0x8020,
ANSWER_OIDS = Enum.Item(0x8020)
# All the following messages are for neoctl to admin node
# Ask information about partition
'ASK_PARTITION_LIST': 0x0021,
ASK_PARTITION_LIST = Enum.Item(0x0021)
# Answer information about partition
'ANSWER_PARTITION_LIST': 0x8021,
ANSWER_PARTITION_LIST = Enum.Item(0x8021)
# Ask information about nodes
'ASK_NODE_LIST': 0x0022,
ASK_NODE_LIST = Enum.Item(0x0022)
# Answer information about nodes
'ANSWER_NODE_LIST': 0x8022,
ANSWER_NODE_LIST = Enum.Item(0x8022)
# Set the node state
'SET_NODE_STATE': 0x0023,
SET_NODE_STATE = Enum.Item(0x0023)
# Answer state of the node
'ANSWER_NODE_STATE': 0x8023,
ANSWER_NODE_STATE = Enum.Item(0x8023)
# Ask the primary to include some pending node in the partition table
'ADD_PENDING_NODES': 0x0024,
ADD_PENDING_NODES = Enum.Item(0x0024)
# Anwer what are the nodes added in the partition table
'ANSWER_NEW_NODES': 0x8024,
ANSWER_NEW_NODES = Enum.Item(0x8024)
# Ask node information
'ASK_NODE_INFORMATION': 0x0025,
ASK_NODE_INFORMATION = Enum.Item(0x0025)
# Answer node information
'ANSWER_NODE_INFORMATION': 0x8025,
ANSWER_NODE_INFORMATION = Enum.Item(0x8025)
# Set the cluster state
'SET_CLUSTER_STATE': 0x0026,
SET_CLUSTER_STATE = Enum.Item(0x0026)
# Notify information about the cluster
'NOTIFY_CLUSTER_INFORMATION': 0x0027,
NOTIFY_CLUSTER_INFORMATION = Enum.Item(0x0027)
# Ask state of the cluster
'ASK_CLUSTER_STATE': 0x0028,
ASK_CLUSTER_STATE = Enum.Item(0x0028)
# Answer state of the cluster
'ANSWER_CLUSTER_STATE': 0x8028,
ANSWER_CLUSTER_STATE = Enum.Item(0x8028)
# Notify being alive. Any -> Any.
'PONG': 0x0029,
PONG = Enum.Item(0x0029)
# Notify last OID generated
'NOTIFY_LAST_OID': 0x0030,
NOTIFY_LAST_OID = Enum.Item(0x0030)
})
PacketTypes = PacketTypes()
class ErrorCodes(Enum):
# TODO: clarify the use of each error code
......@@ -464,7 +402,7 @@ def parse(msg):
return None
msg_id, msg_type, msg_len = unpack('!LHL', msg[:PACKET_HEADER_SIZE])
try:
msg_type = packet_types[msg_type]
msg_type = PacketTypes[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
......@@ -572,17 +510,17 @@ def _decodeError(body):
code = _decodeErrorCode(code)
(message, _) = _readString(body, 'message', offset=2)
return (code, message)
decode_table[ERROR] = _decodeError
decode_table[PacketTypes.ERROR] = _decodeError
@handle_errors
def _decodePing(body):
pass
decode_table[PING] = _decodePing
decode_table[PacketTypes.PING] = _decodePing
@handle_errors
def _decodePong(body):
pass
decode_table[PONG] = _decodePong
decode_table[PacketTypes.PONG] = _decodePong
@handle_errors
def _decodeRequestNodeIdentification(body):
......@@ -595,7 +533,7 @@ def _decodeRequestNodeIdentification(body):
if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch')
return node_type, uuid, address, name
decode_table[REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification
decode_table[PacketTypes.REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification
@handle_errors
def _decodeAcceptNodeIdentification(body):
......@@ -606,12 +544,12 @@ def _decodeAcceptNodeIdentification(body):
uuid = _decodeUUID(uuid)
your_uuid == _decodeUUID(uuid)
return (node_type, uuid, address, num_partitions, num_replicas, your_uuid)
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
decode_table[PacketTypes.ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
@handle_errors
def _decodeAskPrimaryMaster(body):
pass
decode_table[ASK_PRIMARY_MASTER] = _decodeAskPrimaryMaster
decode_table[PacketTypes.ASK_PRIMARY_MASTER] = _decodeAskPrimaryMaster
@handle_errors
def _decodeAnswerPrimaryMaster(body):
......@@ -624,17 +562,17 @@ def _decodeAnswerPrimaryMaster(body):
known_master_list.append((address, uuid))
primary_uuid = _decodeUUID(primary_uuid)
return (primary_uuid, known_master_list)
decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
decode_table[PacketTypes.ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
@handle_errors
def _decodeAnnouncePrimaryMaster(body):
pass
decode_table[ANNOUNCE_PRIMARY_MASTER] = _decodeAnnouncePrimaryMaster
decode_table[PacketTypes.ANNOUNCE_PRIMARY_MASTER] = _decodeAnnouncePrimaryMaster
@handle_errors
def _decodeReelectPrimaryMaster(body):
pass
decode_table[REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
decode_table[PacketTypes.REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
@handle_errors
def _decodeNotifyNodeInformation(body):
......@@ -649,19 +587,19 @@ def _decodeNotifyNodeInformation(body):
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
decode_table[PacketTypes.NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
@handle_errors
def _decodeAskLastIDs(body):
pass
decode_table[ASK_LAST_IDS] = _decodeAskLastIDs
decode_table[PacketTypes.ASK_LAST_IDS] = _decodeAskLastIDs
@handle_errors
def _decodeAnswerLastIDs(body):
(loid, ltid, lptid) = unpack('!8s8s8s', body)
lptid = _decodePTID(lptid)
return (loid, ltid, lptid)
decode_table[ANSWER_LAST_IDS] = _decodeAnswerLastIDs
decode_table[PacketTypes.ANSWER_LAST_IDS] = _decodeAnswerLastIDs
@handle_errors
def _decodeAskPartitionTable(body):
......@@ -671,7 +609,7 @@ def _decodeAskPartitionTable(body):
offset = unpack('!L', body[4+i*4:8+i*4])[0]
offset_list.append(offset)
return (offset_list,)
decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable
decode_table[PacketTypes.ASK_PARTITION_TABLE] = _decodeAskPartitionTable
@handle_errors
def _decodeAnswerPartitionTable(body):
......@@ -692,7 +630,7 @@ def _decodeAnswerPartitionTable(body):
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
decode_table[PacketTypes.ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
@handle_errors
def _decodeSendPartitionTable(body):
......@@ -713,7 +651,7 @@ def _decodeSendPartitionTable(body):
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable
decode_table[PacketTypes.SEND_PARTITION_TABLE] = _decodeSendPartitionTable
@handle_errors
def _decodeNotifyPartitionChanges(body):
......@@ -726,22 +664,22 @@ def _decodeNotifyPartitionChanges(body):
uuid = _decodeUUID(uuid)
cell_list.append((offset, uuid, state))
return ptid, cell_list
decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
decode_table[PacketTypes.NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
@handle_errors
def _decodeStartOperation(body):
pass
decode_table[START_OPERATION] = _decodeStartOperation
decode_table[PacketTypes.START_OPERATION] = _decodeStartOperation
@handle_errors
def _decodeStopOperation(body):
pass
decode_table[STOP_OPERATION] = _decodeStopOperation
decode_table[PacketTypes.STOP_OPERATION] = _decodeStopOperation
@handle_errors
def _decodeAskUnfinishedTransactions(body):
pass
decode_table[ASK_UNFINISHED_TRANSACTIONS] = _decodeAskUnfinishedTransactions
decode_table[PacketTypes.ASK_UNFINISHED_TRANSACTIONS] = _decodeAskUnfinishedTransactions
@handle_errors
def _decodeAnswerUnfinishedTransactions(body):
......@@ -751,50 +689,50 @@ def _decodeAnswerUnfinishedTransactions(body):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
return (tid_list,)
decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions
decode_table[PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions
@handle_errors
def _decodeAskObjectPresent(body):
(oid, tid) = unpack('8s8s', body)
return (oid, tid)
decode_table[ASK_OBJECT_PRESENT] = _decodeAskObjectPresent
decode_table[PacketTypes.ASK_OBJECT_PRESENT] = _decodeAskObjectPresent
@handle_errors
def _decodeAnswerObjectPresent(body):
(oid, tid) = unpack('8s8s', body)
return (oid, tid)
decode_table[ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent
decode_table[PacketTypes.ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent
@handle_errors
def _decodeDeleteTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction
decode_table[PacketTypes.DELETE_TRANSACTION] = _decodeDeleteTransaction
@handle_errors
def _decodeCommitTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
decode_table[PacketTypes.COMMIT_TRANSACTION] = _decodeCommitTransaction
@handle_errors
def _decodeAskBeginTransaction(body):
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[ASK_BEGIN_TRANSACTION] = _decodeAskBeginTransaction
decode_table[PacketTypes.ASK_BEGIN_TRANSACTION] = _decodeAskBeginTransaction
@handle_errors
def _decodeAnswerBeginTransaction(body):
(tid, ) = unpack('8s', body)
tid = _decodeTID(tid)
return (tid, )
decode_table[ANSWER_BEGIN_TRANSACTION] = _decodeAnswerBeginTransaction
decode_table[PacketTypes.ANSWER_BEGIN_TRANSACTION] = _decodeAnswerBeginTransaction
@handle_errors
def _decodeAskNewOIDs(body):
return unpack('!H', body) # num oids
decode_table[ASK_NEW_OIDS] = _decodeAskNewOIDs
decode_table[PacketTypes.ASK_NEW_OIDS] = _decodeAskNewOIDs
@handle_errors
def _decodeAnswerNewOIDs(body):
......@@ -804,7 +742,7 @@ def _decodeAnswerNewOIDs(body):
oid = unpack('8s', body[2+i*8:10+i*8])[0]
oid_list.append(oid)
return (oid_list,)
decode_table[ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
decode_table[PacketTypes.ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
@handle_errors
def _decodeFinishTransaction(body):
......@@ -814,25 +752,25 @@ def _decodeFinishTransaction(body):
oid = unpack('8s', body[12+i*8:20+i*8])[0]
oid_list.append(oid)
return (oid_list, tid)
decode_table[FINISH_TRANSACTION] = _decodeFinishTransaction
decode_table[PacketTypes.FINISH_TRANSACTION] = _decodeFinishTransaction
@handle_errors
def _decodeNotifyTransactionFinished(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
decode_table[PacketTypes.NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
@handle_errors
def _decodeLockInformation(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[LOCK_INFORMATION] = _decodeLockInformation
decode_table[PacketTypes.LOCK_INFORMATION] = _decodeLockInformation
@handle_errors
def _decodeNotifyInformationLocked(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
decode_table[PacketTypes.NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
@handle_errors
def _decodeInvalidateObjects(body):
......@@ -842,19 +780,19 @@ def _decodeInvalidateObjects(body):
oid = unpack('8s', body[i:i+8])[0]
oid_list.append(oid)
return (oid_list, tid)
decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects
decode_table[PacketTypes.INVALIDATE_OBJECTS] = _decodeInvalidateObjects
@handle_errors
def _decodeUnlockInformation(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation
decode_table[PacketTypes.UNLOCK_INFORMATION] = _decodeUnlockInformation
@handle_errors
def _decodeAbortTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction
decode_table[PacketTypes.ABORT_TRANSACTION] = _decodeAbortTransaction
@handle_errors
def _decodeAskStoreObject(body):
......@@ -862,13 +800,13 @@ def _decodeAskStoreObject(body):
oid, serial, tid, compression, checksum = r
(data, _) = _readString(body, 'data', offset=29)
return (oid, serial, compression, checksum, data, tid)
decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject
decode_table[PacketTypes.ASK_STORE_OBJECT] = _decodeAskStoreObject
@handle_errors
def _decodeAnswerStoreObject(body):
(conflicting, oid, serial) = unpack('!B8s8s', body)
return (conflicting, oid, serial)
decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
decode_table[PacketTypes.ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
@handle_errors
def _decodeAskStoreTransaction(body):
......@@ -887,13 +825,13 @@ def _decodeAskStoreTransaction(body):
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, oid_list)
decode_table[ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
decode_table[PacketTypes.ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
@handle_errors
def _decodeAnswerStoreTransaction(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
decode_table[PacketTypes.ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
@handle_errors
def _decodeAskObject(body):
......@@ -902,7 +840,7 @@ def _decodeAskObject(body):
serial = None
tid = _decodeTID(tid)
return (oid, serial, tid)
decode_table[ASK_OBJECT] = _decodeAskObject
decode_table[PacketTypes.ASK_OBJECT] = _decodeAskObject
@handle_errors
def _decodeAnswerObject(body):
......@@ -912,12 +850,12 @@ def _decodeAnswerObject(body):
serial_end = None
(data, _) = _readString(body, 'data', offset=29)
return (oid, serial_start, serial_end, compression, checksum, data)
decode_table[ANSWER_OBJECT] = _decodeAnswerObject
decode_table[PacketTypes.ANSWER_OBJECT] = _decodeAnswerObject
@handle_errors
def _decodeAskTIDs(body):
return unpack('!QQL', body) # first, last, partition
decode_table[ASK_TIDS] = _decodeAskTIDs
decode_table[PacketTypes.ASK_TIDS] = _decodeAskTIDs
@handle_errors
def _decodeAnswerTIDs(body):
......@@ -927,13 +865,13 @@ def _decodeAnswerTIDs(body):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
return (tid_list,)
decode_table[ANSWER_TIDS] = _decodeAnswerTIDs
decode_table[PacketTypes.ANSWER_TIDS] = _decodeAnswerTIDs
@handle_errors
def _decodeAskTransactionInformation(body):
(tid, ) = unpack('8s', body)
return (tid, )
decode_table[ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation
decode_table[PacketTypes.ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation
@handle_errors
def _decodeAnswerTransactionInformation(body):
......@@ -952,13 +890,13 @@ def _decodeAnswerTransactionInformation(body):
body = body[8:]
oid_list.append(oid)
return (tid, user, desc, ext, oid_list)
decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
decode_table[PacketTypes.ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
@handle_errors
def _decodeAskObjectHistory(body):
(oid, first, last) = unpack('!8sQQ', body)
return (oid, first, last)
decode_table[ASK_OBJECT_HISTORY] = _decodeAskObjectHistory
decode_table[PacketTypes.ASK_OBJECT_HISTORY] = _decodeAskObjectHistory
@handle_errors
def _decodeAnswerObjectHistory(body):
......@@ -968,12 +906,12 @@ def _decodeAnswerObjectHistory(body):
serial, size = unpack('!8sL', body[i:i+12])
history_list.append((serial, size))
return (oid, history_list)
decode_table[ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory
decode_table[PacketTypes.ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory
@handle_errors
def _decodeAskOIDs(body):
return unpack('!QQL', body) # first, last, partition
decode_table[ASK_OIDS] = _decodeAskOIDs
decode_table[PacketTypes.ASK_OIDS] = _decodeAskOIDs
@handle_errors
def _decodeAnswerOIDs(body):
......@@ -983,14 +921,14 @@ def _decodeAnswerOIDs(body):
oid = unpack('8s', body[4+i*8:12+i*8])[0]
oid_list.append(oid)
return (oid_list,)
decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
decode_table[PacketTypes.ANSWER_OIDS] = _decodeAnswerOIDs
@handle_errors
def _decodeAskPartitionList(body):
(min_offset, max_offset, uuid) = unpack('!LL16s', body)
uuid = _decodeUUID(uuid)
return (min_offset, max_offset, uuid)
decode_table[ASK_PARTITION_LIST] = _decodeAskPartitionList
decode_table[PacketTypes.ASK_PARTITION_LIST] = _decodeAskPartitionList
@handle_errors
def _decodeAnswerPartitionList(body):
......@@ -1011,14 +949,14 @@ def _decodeAnswerPartitionList(body):
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
return (ptid, row_list)
decode_table[ANSWER_PARTITION_LIST] = _decodeAnswerPartitionList
decode_table[PacketTypes.ANSWER_PARTITION_LIST] = _decodeAnswerPartitionList
@handle_errors
def _decodeAskNodeList(body):
(node_type, ) = unpack('!H', body)
node_type = _decodeNodeType(node_type)
return (node_type,)
decode_table[ASK_NODE_LIST] = _decodeAskNodeList
decode_table[PacketTypes.ASK_NODE_LIST] = _decodeAskNodeList
@handle_errors
def _decodeAnswerNodeList(body):
......@@ -1033,7 +971,7 @@ def _decodeAnswerNodeList(body):
uuid = _decodeUUID(uuid)
node_list.append((node_type, address, uuid, state))
return (node_list,)
decode_table[ANSWER_NODE_LIST] = _decodeAnswerNodeList
decode_table[PacketTypes.ANSWER_NODE_LIST] = _decodeAnswerNodeList
@handle_errors
def _decodeSetNodeState(body):
......@@ -1041,7 +979,7 @@ def _decodeSetNodeState(body):
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state, modify)
decode_table[SET_NODE_STATE] = _decodeSetNodeState
decode_table[PacketTypes.SET_NODE_STATE] = _decodeSetNodeState
@handle_errors
def _decodeAnswerNodeState(body):
......@@ -1049,7 +987,7 @@ def _decodeAnswerNodeState(body):
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state)
decode_table[ANSWER_NODE_STATE] = _decodeAnswerNodeState
decode_table[PacketTypes.ANSWER_NODE_STATE] = _decodeAnswerNodeState
@handle_errors
def _decodeAddPendingNodes(body):
......@@ -1057,7 +995,7 @@ def _decodeAddPendingNodes(body):
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_decodeUUID, uuid_list)
return (uuid_list, )
decode_table[ADD_PENDING_NODES] = _decodeAddPendingNodes
decode_table[PacketTypes.ADD_PENDING_NODES] = _decodeAddPendingNodes
@handle_errors
def _decodeAnswerNewNodes(body):
......@@ -1065,50 +1003,50 @@ def _decodeAnswerNewNodes(body):
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
uuid_list = map(_decodeUUID, uuid_list)
return (uuid_list, )
decode_table[ANSWER_NEW_NODES] = _decodeAnswerNewNodes
decode_table[PacketTypes.ANSWER_NEW_NODES] = _decodeAnswerNewNodes
def _decodeAskNodeInformation(body):
pass # No payload
decode_table[ASK_NODE_INFORMATION] = _decodeAskNodeInformation
decode_table[ANSWER_NODE_INFORMATION] = _decodeNotifyNodeInformation
decode_table[PacketTypes.ASK_NODE_INFORMATION] = _decodeAskNodeInformation
decode_table[PacketTypes.ANSWER_NODE_INFORMATION] = _decodeNotifyNodeInformation
def _decodeAskClusterState(body):
pass
decode_table[ASK_CLUSTER_STATE] = _decodeAskClusterState
decode_table[PacketTypes.ASK_CLUSTER_STATE] = _decodeAskClusterState
@handle_errors
def _decodeAnswerClusterState(body):
(state, ) = unpack('!H', body)
state = _decodeClusterState(state)
return (state, )
decode_table[ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState
decode_table[PacketTypes.ANSWER_CLUSTER_STATE] = _decodeAnswerClusterState
@handle_errors
def _decodeSetClusterState(body):
(state, ) = unpack('!H', body[:2])
state = _decodeClusterState(state)
return (state, )
decode_table[SET_CLUSTER_STATE] = _decodeSetClusterState
decode_table[PacketTypes.SET_CLUSTER_STATE] = _decodeSetClusterState
@handle_errors
def _decodeNotifyClusterInformation(body):
(state, ) = unpack('!H', body)
state = _decodeClusterState(state)
return (state, )
decode_table[NOTIFY_CLUSTER_INFORMATION] = _decodeNotifyClusterInformation
decode_table[PacketTypes.NOTIFY_CLUSTER_INFORMATION] = _decodeNotifyClusterInformation
@handle_errors
def _decodeNotifyLastOID(body):
(loid, ) = unpack('8s', body)
return (loid, )
decode_table[NOTIFY_LAST_OID] = _decodeNotifyLastOID
decode_table[PacketTypes.NOTIFY_LAST_OID] = _decodeNotifyLastOID
# Packet encoding
def _error(error_code, error_message):
body = pack('!HL', error_code, len(error_message)) + error_message
return Packet(ERROR, body)
return Packet(PacketTypes.ERROR, body)
def noError(message):
return _error(ErrorCodes.NO_ERROR, message)
......@@ -1133,17 +1071,17 @@ def tidNotFound(error_message):
return _error(ErrorCodes.TID_NOT_FOUND, 'tid not found: ' + error_message)
def ping():
return Packet(PING)
return Packet(PacketTypes.PING)
def pong():
return Packet(PONG)
return Packet(PacketTypes.PONG)
def requestNodeIdentification(node_type, uuid, address, name):
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body = pack('!LLH16s6sL', PROTOCOL_VERSION[0], PROTOCOL_VERSION[1],
node_type, uuid, address, len(name)) + name
return Packet(REQUEST_NODE_IDENTIFICATION, body)
return Packet(PacketTypes.REQUEST_NODE_IDENTIFICATION, body)
def acceptNodeIdentification(node_type, uuid, address,
num_partitions, num_replicas, your_uuid):
......@@ -1152,10 +1090,10 @@ def acceptNodeIdentification(node_type, uuid, address,
address = _encodeAddress(address)
body = pack('!H16s6sLL16s', node_type, uuid, address,
num_partitions, num_replicas, your_uuid)
return Packet(ACCEPT_NODE_IDENTIFICATION, body)
return Packet(PacketTypes.ACCEPT_NODE_IDENTIFICATION, body)
def askPrimaryMaster():
return Packet(ASK_PRIMARY_MASTER)
return Packet(PacketTypes.ASK_PRIMARY_MASTER)
def answerPrimaryMaster(primary_uuid, known_master_list):
primary_uuid = _encodeUUID(primary_uuid)
......@@ -1165,13 +1103,13 @@ def answerPrimaryMaster(primary_uuid, known_master_list):
address = _encodeAddress(address)
body.append(pack('!6s16s', address, uuid))
body = ''.join(body)
return Packet(ANSWER_PRIMARY_MASTER, body)
return Packet(PacketTypes.ANSWER_PRIMARY_MASTER, body)
def announcePrimaryMaster():
return Packet(ANNOUNCE_PRIMARY_MASTER)
return Packet(PacketTypes.ANNOUNCE_PRIMARY_MASTER)
def reelectPrimaryMaster():
return Packet(REELECT_PRIMARY_MASTER)
return Packet(PacketTypes.REELECT_PRIMARY_MASTER)
def notifyNodeInformation(node_list):
body = [pack('!L', len(node_list))]
......@@ -1180,10 +1118,10 @@ def notifyNodeInformation(node_list):
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(NOTIFY_NODE_INFORMATION, body)
return Packet(PacketTypes.NOTIFY_NODE_INFORMATION, body)
def askLastIDs():
return Packet(ASK_LAST_IDS)
return Packet(PacketTypes.ASK_LAST_IDS)
def answerLastIDs(loid, ltid, lptid):
# in this case, loid is a valid OID but considered as invalid. This is not
......@@ -1192,14 +1130,14 @@ def answerLastIDs(loid, ltid, lptid):
loid = INVALID_OID
ltid = _encodeTID(ltid)
lptid = _encodePTID(lptid)
return Packet(ANSWER_LAST_IDS, loid + ltid + lptid)
return Packet(PacketTypes.ANSWER_LAST_IDS, loid + ltid + lptid)
def askPartitionTable(offset_list):
body = [pack('!L', len(offset_list))]
for offset in offset_list:
body.append(pack('!L', offset))
body = ''.join(body)
return Packet(ASK_PARTITION_TABLE, body)
return Packet(PacketTypes.ASK_PARTITION_TABLE, body)
def answerPartitionTable(ptid, row_list):
ptid = _encodePTID(ptid)
......@@ -1210,7 +1148,7 @@ def answerPartitionTable(ptid, row_list):
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(ANSWER_PARTITION_TABLE, body)
return Packet(PacketTypes.ANSWER_PARTITION_TABLE, body)
def sendPartitionTable(ptid, row_list):
ptid = _encodePTID(ptid)
......@@ -1221,7 +1159,7 @@ def sendPartitionTable(ptid, row_list):
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(SEND_PARTITION_TABLE, body)
return Packet(PacketTypes.SEND_PARTITION_TABLE, body)
def notifyPartitionChanges(ptid, cell_list):
ptid = _encodePTID(ptid)
......@@ -1230,78 +1168,78 @@ def notifyPartitionChanges(ptid, cell_list):
uuid = _encodeUUID(uuid)
body.append(pack('!L16sH', offset, uuid, state))
body = ''.join(body)
return Packet(NOTIFY_PARTITION_CHANGES, body)
return Packet(PacketTypes.NOTIFY_PARTITION_CHANGES, body)
def startOperation():
return Packet(START_OPERATION)
return Packet(PacketTypes.START_OPERATION)
def stopOperation():
return Packet(STOP_OPERATION)
return Packet(PacketTypes.STOP_OPERATION)
def askUnfinishedTransactions():
return Packet(ASK_UNFINISHED_TRANSACTIONS)
return Packet(PacketTypes.ASK_UNFINISHED_TRANSACTIONS)
def answerUnfinishedTransactions(tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
body = ''.join(body)
return Packet(ANSWER_UNFINISHED_TRANSACTIONS, body)
return Packet(PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS, body)
def askObjectPresent(oid, tid):
return Packet(ASK_OBJECT_PRESENT, oid + tid)
return Packet(PacketTypes.ASK_OBJECT_PRESENT, oid + tid)
def answerObjectPresent(oid, tid):
return Packet(ANSWER_OBJECT_PRESENT, oid + tid)
return Packet(PacketTypes.ANSWER_OBJECT_PRESENT, oid + tid)
def deleteTransaction(tid):
return Packet(DELETE_TRANSACTION, tid)
return Packet(PacketTypes.DELETE_TRANSACTION, tid)
def commitTransaction(tid):
return Packet(COMMIT_TRANSACTION, tid)
return Packet(PacketTypes.COMMIT_TRANSACTION, tid)
def askBeginTransaction(tid):
tid = _encodeTID(tid)
return Packet(ASK_BEGIN_TRANSACTION, tid)
return Packet(PacketTypes.ASK_BEGIN_TRANSACTION, tid)
def answerBeginTransaction(tid):
tid = _encodeTID(tid)
return Packet(ANSWER_BEGIN_TRANSACTION, tid)
return Packet(PacketTypes.ANSWER_BEGIN_TRANSACTION, tid)
def askNewOIDs(num_oids):
return Packet(ASK_NEW_OIDS, pack('!H', num_oids))
return Packet(PacketTypes.ASK_NEW_OIDS, pack('!H', num_oids))
def answerNewOIDs(oid_list):
body = [pack('!H', len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(ANSWER_NEW_OIDS, body)
return Packet(PacketTypes.ANSWER_NEW_OIDS, body)
def finishTransaction(oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(FINISH_TRANSACTION, body)
return Packet(PacketTypes.FINISH_TRANSACTION, body)
def notifyTransactionFinished(tid):
return Packet(NOTIFY_TRANSACTION_FINISHED, tid)
return Packet(PacketTypes.NOTIFY_TRANSACTION_FINISHED, tid)
def lockInformation(tid):
return Packet(LOCK_INFORMATION, tid)
return Packet(PacketTypes.LOCK_INFORMATION, tid)
def notifyInformationLocked(tid):
return Packet(NOTIFY_INFORMATION_LOCKED, tid)
return Packet(PacketTypes.NOTIFY_INFORMATION_LOCKED, tid)
def invalidateObjects(oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(INVALIDATE_OBJECTS, body)
return Packet(PacketTypes.INVALIDATE_OBJECTS, body)
def unlockInformation(tid):
return Packet(UNLOCK_INFORMATION, tid)
return Packet(PacketTypes.UNLOCK_INFORMATION, tid)
def abortTransaction(tid):
return Packet(ABORT_TRANSACTION, tid)
return Packet(PacketTypes.ABORT_TRANSACTION, tid)
def askStoreTransaction(tid, user, desc, ext, oid_list):
lengths = (len(oid_list), len(user), len(desc), len(ext))
......@@ -1311,28 +1249,28 @@ def askStoreTransaction(tid, user, desc, ext, oid_list):
body.append(ext)
body.extend(oid_list)
body = ''.join(body)
return Packet(ASK_STORE_TRANSACTION, body)
return Packet(PacketTypes.ASK_STORE_TRANSACTION, body)
def answerStoreTransaction(tid):
return Packet(ANSWER_STORE_TRANSACTION, tid)
return Packet(PacketTypes.ANSWER_STORE_TRANSACTION, tid)
def askStoreObject(oid, serial, compression, checksum, data, tid):
if serial is None:
serial = INVALID_TID
body = pack('!8s8s8sBLL', oid, serial, tid, compression,
checksum, len(data)) + data
return Packet(ASK_STORE_OBJECT, body)
return Packet(PacketTypes.ASK_STORE_OBJECT, body)
def answerStoreObject(conflicting, oid, serial):
if serial is None:
serial = INVALID_TID
body = pack('!B8s8s', conflicting, oid, serial)
return Packet(ANSWER_STORE_OBJECT, body)
return Packet(PacketTypes.ANSWER_STORE_OBJECT, body)
def askObject(oid, serial, tid):
tid = _encodeTID(tid)
serial = _encodeTID(serial) # serial is the previous TID
return Packet(ASK_OBJECT, pack('!8s8s8s', oid, serial, tid))
return Packet(PacketTypes.ASK_OBJECT, pack('!8s8s8s', oid, serial, tid))
def answerObject(oid, serial_start, serial_end, compression,
checksum, data):
......@@ -1342,19 +1280,19 @@ def answerObject(oid, serial_start, serial_end, compression,
serial_end = INVALID_TID
body = pack('!8s8s8sBLL', oid, serial_start, serial_end,
compression, checksum, len(data)) + data
return Packet(ANSWER_OBJECT, body)
return Packet(PacketTypes.ANSWER_OBJECT, body)
def askTIDs(first, last, partition):
return Packet(ASK_TIDS, pack('!QQL', first, last, partition))
return Packet(PacketTypes.ASK_TIDS, pack('!QQL', first, last, partition))
def answerTIDs(tid_list):
body = [pack('!L', len(tid_list))]
body.extend(tid_list)
body = ''.join(body)
return Packet(ANSWER_TIDS, body)
return Packet(PacketTypes.ANSWER_TIDS, body)
def askTransactionInformation(tid):
return Packet(ASK_TRANSACTION_INFORMATION, pack('!8s', tid))
return Packet(PacketTypes.ASK_TRANSACTION_INFORMATION, pack('!8s', tid))
def answerTransactionInformation(tid, user, desc, ext, oid_list):
body = [pack('!8sHHHL', tid, len(user), len(desc), len(ext), len(oid_list))]
......@@ -1363,32 +1301,32 @@ def answerTransactionInformation(tid, user, desc, ext, oid_list):
body.append(ext)
body.extend(oid_list)
body = ''.join(body)
return Packet(ANSWER_TRANSACTION_INFORMATION, body)
return Packet(PacketTypes.ANSWER_TRANSACTION_INFORMATION, body)
def askObjectHistory(oid, first, last):
return Packet(ASK_OBJECT_HISTORY, pack('!8sQQ', oid, first, last))
return Packet(PacketTypes.ASK_OBJECT_HISTORY, pack('!8sQQ', oid, first, last))
def answerObjectHistory(oid, history_list):
body = [pack('!8sL', oid, len(history_list))]
for serial, size in history_list:
body.append(pack('!8sL', serial, size))
body = ''.join(body)
return Packet(ANSWER_OBJECT_HISTORY, body)
return Packet(PacketTypes.ANSWER_OBJECT_HISTORY, body)
def askOIDs(first, last, partition):
return Packet(ASK_OIDS, pack('!QQL', first, last, partition))
return Packet(PacketTypes.ASK_OIDS, pack('!QQL', first, last, partition))
def answerOIDs(oid_list):
body = [pack('!L', len(oid_list))]
body.extend(oid_list)
body = ''.join(body)
return Packet(ANSWER_OIDS, body)
return Packet(PacketTypes.ANSWER_OIDS, body)
def askPartitionList(min_offset, max_offset, uuid):
uuid = _encodeUUID(uuid)
body = [pack('!LL16s', min_offset, max_offset, uuid)]
body = ''.join(body)
return Packet(ASK_PARTITION_LIST, body)
return Packet(PacketTypes.ASK_PARTITION_LIST, body)
def answerPartitionList(ptid, row_list):
ptid = _encodePTID(ptid)
......@@ -1399,12 +1337,12 @@ def answerPartitionList(ptid, row_list):
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body = ''.join(body)
return Packet(ANSWER_PARTITION_LIST, body)
return Packet(PacketTypes.ANSWER_PARTITION_LIST, body)
def askNodeList(node_type):
body = [pack('!H', node_type)]
body = ''.join(body)
return Packet(ASK_NODE_LIST, body)
return Packet(PacketTypes.ASK_NODE_LIST, body)
def answerNodeList(node_list):
body = [pack('!L', len(node_list))]
......@@ -1413,34 +1351,34 @@ def answerNodeList(node_list):
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(ANSWER_NODE_LIST, body)
return Packet(PacketTypes.ANSWER_NODE_LIST, body)
def setNodeState(uuid, state, modify_partition_table):
uuid = _encodeUUID(uuid)
body = [pack('!16sHB', uuid, state, modify_partition_table)]
body = ''.join(body)
return Packet(SET_NODE_STATE, body)
return Packet(PacketTypes.SET_NODE_STATE, body)
def answerNodeState(uuid, state):
uuid = _encodeUUID(uuid)
body = [pack('!16sH', uuid, state)]
body = ''.join(body)
return Packet(ANSWER_NODE_STATE, body)
return Packet(PacketTypes.ANSWER_NODE_STATE, body)
def addPendingNodes(uuid_list=()):
# an empty list means all current pending nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(ADD_PENDING_NODES, body)
return Packet(PacketTypes.ADD_PENDING_NODES, body)
def answerNewNodes(uuid_list):
# an empty list means no new nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
body = pack('!H', len(uuid_list)) + ''.join(uuid_list)
return Packet(ANSWER_NEW_NODES, body)
return Packet(PacketTypes.ANSWER_NEW_NODES, body)
def askNodeInformation():
return Packet(ASK_NODE_INFORMATION)
return Packet(PacketTypes.ASK_NODE_INFORMATION)
def answerNodeInformation(node_list):
body = [pack('!L', len(node_list))]
......@@ -1448,23 +1386,23 @@ def answerNodeInformation(node_list):
uuid = _encodeUUID(uuid)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body = ''.join(body)
return Packet(ANSWER_NODE_INFORMATION, body)
return Packet(PacketTypes.ANSWER_NODE_INFORMATION, body)
def askClusterState():
return Packet(ASK_CLUSTER_STATE)
return Packet(PacketTypes.ASK_CLUSTER_STATE)
def answerClusterState(state):
body = pack('!H', state)
return Packet(ANSWER_CLUSTER_STATE, body)
return Packet(PacketTypes.ANSWER_CLUSTER_STATE, body)
def setClusterState(state):
body = pack('!H', state)
return Packet(SET_CLUSTER_STATE, body)
return Packet(PacketTypes.SET_CLUSTER_STATE, body)
def notifyClusterInformation(state):
body = pack('!H', state)
return Packet(NOTIFY_CLUSTER_INFORMATION, body)
return Packet(PacketTypes.NOTIFY_CLUSTER_INFORMATION, body)
def notifyLastOID(oid):
body = pack('!8s', oid)
return Packet(NOTIFY_LAST_OID, body)
return Packet(PacketTypes.NOTIFY_LAST_OID, body)
......@@ -22,6 +22,7 @@ import MySQLdb
from neo import logging
from mock import Mock
from neo import protocol
from neo.protocol import PacketTypes
DB_PREFIX = 'test_neo_'
DB_ADMIN = 'root'
......@@ -168,7 +169,7 @@ class NeoTestBase(unittest.TestCase):
self.assertEquals(len(calls), 1)
packet = calls[0].getParam(0)
self.assertTrue(isinstance(packet, protocol.Packet))
self.assertEquals(packet.getType(), protocol.ERROR)
self.assertEquals(packet.getType(), PacketTypes.ERROR)
if decode:
return protocol.decode_table[packet.getType()](packet._body)
return packet
......@@ -210,97 +211,97 @@ class NeoTestBase(unittest.TestCase):
return packet
def checkNotifyNodeInformation(self, conn, **kw):
return self.checkNotifyPacket(conn, protocol.NOTIFY_NODE_INFORMATION, **kw)
return self.checkNotifyPacket(conn, PacketTypes.NOTIFY_NODE_INFORMATION, **kw)
def checkSendPartitionTable(self, conn, **kw):
return self.checkNotifyPacket(conn, protocol.SEND_PARTITION_TABLE, **kw)
return self.checkNotifyPacket(conn, PacketTypes.SEND_PARTITION_TABLE, **kw)
def checkStartOperation(self, conn, **kw):
return self.checkNotifyPacket(conn, protocol.START_OPERATION, **kw)
return self.checkNotifyPacket(conn, PacketTypes.START_OPERATION, **kw)
def checkNotifyTransactionFinished(self, conn, **kw):
return self.checkNotifyPacket(conn, protocol.NOTIFY_TRANSACTION_FINISHED, **kw)
return self.checkNotifyPacket(conn, PacketTypes.NOTIFY_TRANSACTION_FINISHED, **kw)
def checkNotifyInformationLocked(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.NOTIFY_INFORMATION_LOCKED, **kw)
return self.checkAnswerPacket(conn, PacketTypes.NOTIFY_INFORMATION_LOCKED, **kw)
def checkLockInformation(self, conn, **kw):
return self.checkAskPacket(conn, protocol.LOCK_INFORMATION, **kw)
return self.checkAskPacket(conn, PacketTypes.LOCK_INFORMATION, **kw)
def checkUnlockInformation(self, conn, **kw):
return self.checkAskPacket(conn, protocol.UNLOCK_INFORMATION, **kw)
return self.checkAskPacket(conn, PacketTypes.UNLOCK_INFORMATION, **kw)
def checkRequestNodeIdentification(self, conn, **kw):
return self.checkAskPacket(conn, protocol.REQUEST_NODE_IDENTIFICATION, **kw)
return self.checkAskPacket(conn, PacketTypes.REQUEST_NODE_IDENTIFICATION, **kw)
def checkAskPrimaryMaster(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_PRIMARY_MASTER)
return self.checkAskPacket(conn, PacketTypes.ASK_PRIMARY_MASTER)
def checkAskUnfinishedTransactions(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_UNFINISHED_TRANSACTIONS)
return self.checkAskPacket(conn, PacketTypes.ASK_UNFINISHED_TRANSACTIONS)
def checkAskTransactionInformation(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_TRANSACTION_INFORMATION, **kw)
return self.checkAskPacket(conn, PacketTypes.ASK_TRANSACTION_INFORMATION, **kw)
def checkAskObjectPresent(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_OBJECT_PRESENT, **kw)
return self.checkAskPacket(conn, PacketTypes.ASK_OBJECT_PRESENT, **kw)
def checkAskObject(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_OBJECT, **kw)
return self.checkAskPacket(conn, PacketTypes.ASK_OBJECT, **kw)
def checkAskStoreObject(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_STORE_OBJECT, **kw)
return self.checkAskPacket(conn, PacketTypes.ASK_STORE_OBJECT, **kw)
def checkAskStoreTransaction(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_STORE_TRANSACTION, **kw)
return self.checkAskPacket(conn, PacketTypes.ASK_STORE_TRANSACTION, **kw)
def checkFinishTransaction(self, conn, **kw):
return self.checkAskPacket(conn, protocol.FINISH_TRANSACTION, **kw)
return self.checkAskPacket(conn, PacketTypes.FINISH_TRANSACTION, **kw)
def checkAskNewTid(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_BEGIN_TRANSACTION, **kw)
return self.checkAskPacket(conn, PacketTypes.ASK_BEGIN_TRANSACTION, **kw)
def checkAskLastIDs(self, conn, **kw):
return self.checkAskPacket(conn, protocol.ASK_LAST_IDS, **kw)
return self.checkAskPacket(conn, PacketTypes.ASK_LAST_IDS, **kw)
def checkAcceptNodeIdentification(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ACCEPT_NODE_IDENTIFICATION, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ACCEPT_NODE_IDENTIFICATION, **kw)
def checkAnswerPrimaryMaster(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_PRIMARY_MASTER, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_PRIMARY_MASTER, **kw)
def checkAnswerLastIDs(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_LAST_IDS, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_LAST_IDS, **kw)
def checkAnswerUnfinishedTransactions(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_UNFINISHED_TRANSACTIONS, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS, **kw)
def checkAnswerObject(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_OBJECT, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_OBJECT, **kw)
def checkAnswerTransactionInformation(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_TRANSACTION_INFORMATION, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_TRANSACTION_INFORMATION, **kw)
def checkAnswerTids(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_TIDS, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_TIDS, **kw)
def checkAnswerObjectHistory(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_OBJECT_HISTORY, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_OBJECT_HISTORY, **kw)
def checkAnswerStoreTransaction(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_STORE_TRANSACTION, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_STORE_TRANSACTION, **kw)
def checkAnswerStoreObject(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_STORE_OBJECT, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_STORE_OBJECT, **kw)
def checkAnswerOids(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_OIDS, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_OIDS, **kw)
def checkAnswerPartitionTable(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_PARTITION_TABLE, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_PARTITION_TABLE, **kw)
def checkAnswerObjectPresent(self, conn, **kw):
return self.checkAnswerPacket(conn, protocol.ANSWER_OBJECT_PRESENT, **kw)
return self.checkAnswerPacket(conn, PacketTypes.ANSWER_OBJECT_PRESENT, **kw)
# XXX: imported from neo.master.test.connector since it's used at many places
......
......@@ -23,14 +23,9 @@ from struct import pack, unpack
from neo.tests import NeoTestBase
import neo.master
from neo import protocol
from neo.protocol import Packet, NodeTypes, NodeStates, INVALID_UUID
from neo.protocol import Packet, PacketTypes, NodeTypes, NodeStates
from neo.master.handlers.client import ClientServiceHandler
from neo.master.app import Application
from neo.protocol import ERROR, PING, PONG, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, \
ASK_LAST_IDS, ANSWER_LAST_IDS, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ASK_BEGIN_TRANSACTION, FINISH_TRANSACTION, \
NOTIFY_INFORMATION_LOCKED, ASK_NEW_OIDS, ABORT_TRANSACTION
from neo.exception import OperationFailure, ElectionFailure
class MasterClientHandlerTests(NeoTestBase):
......@@ -82,7 +77,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_05_handleNotifyNodeInformation(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=NOTIFY_NODE_INFORMATION)
packet = Packet(msg_type=PacketTypes.NOTIFY_NODE_INFORMATION)
# tell the master node that is not running any longer, it must raises
conn = self.getFakeConnection(uuid, self.storage_address)
node_list = [(NodeTypes.MASTER, ('127.0.0.1', self.master_port),
......@@ -153,7 +148,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_06_handleAnswerLastIDs(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ANSWER_LAST_IDS)
packet = Packet(msg_type=PacketTypes.ANSWER_LAST_IDS)
loid = self.app.loid
ltid = self.app.ltid
lptid = self.app.pt.getID()
......@@ -180,7 +175,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_07_handleAskBeginTransaction(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ASK_BEGIN_TRANSACTION)
packet = Packet(msg_type=PacketTypes.ASK_BEGIN_TRANSACTION)
ltid = self.app.ltid
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
......@@ -195,7 +190,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_08_handleAskNewOIDs(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ASK_NEW_OIDS)
packet = Packet(msg_type=PacketTypes.ASK_NEW_OIDS)
loid = self.app.loid
# client call it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
......@@ -206,7 +201,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_09_handleFinishTransaction(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=FINISH_TRANSACTION)
packet = Packet(msg_type=PacketTypes.FINISH_TRANSACTION)
packet.setId(9)
# give an older tid than the PMN known, must abort
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
......@@ -244,7 +239,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_11_handleAbortTransaction(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ABORT_TRANSACTION)
packet = Packet(msg_type=PacketTypes.ABORT_TRANSACTION)
# give a bad tid, must not failed, just ignored it
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT, port=self.client_port)
conn = self.getFakeConnection(client_uuid, self.client_address)
......@@ -263,7 +258,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_12_handleAskLastIDs(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ASK_LAST_IDS)
packet = Packet(msg_type=PacketTypes.ASK_LAST_IDS)
# give a uuid
conn = self.getFakeConnection(uuid, self.storage_address)
ptid = self.app.pt.getID()
......@@ -280,7 +275,7 @@ class MasterClientHandlerTests(NeoTestBase):
def test_13_handleAskUnfinishedTransactions(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ASK_UNFINISHED_TRANSACTIONS)
packet = Packet(msg_type=PacketTypes.ASK_UNFINISHED_TRANSACTIONS)
# give a uuid
conn = self.getFakeConnection(uuid, self.storage_address)
service.handleAskUnfinishedTransactions(conn, packet)
......
......@@ -22,24 +22,9 @@ from mock import Mock
from struct import pack, unpack
from neo.tests import NeoTestBase
from neo import protocol
from neo.protocol import Packet, NodeTypes, NodeStates, INVALID_UUID
from neo.protocol import Packet, PacketTypes, NodeTypes, NodeStates, INVALID_UUID
from neo.master.handlers.election import ClientElectionHandler, ServerElectionHandler
from neo.master.app import Application
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_BEGIN_TRANSACTION, ANSWER_BEGIN_TRANSACTION, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
ASK_OIDS, ANSWER_OIDS
from neo.exception import OperationFailure, ElectionFailure
from neo.tests import DoNothingConnector
from neo.connection import ClientConnection
......@@ -541,7 +526,7 @@ class MasterServerElectionTests(NeoTestBase):
def test_12_handleAnnouncePrimaryMaster(self):
election = self.election
uuid = self.identifyToMasterNode(port=self.master_port)
packet = Packet(msg_type=ANNOUNCE_PRIMARY_MASTER)
packet = Packet(msg_type=PacketTypes.ANNOUNCE_PRIMARY_MASTER)
# No uuid
conn = Mock({"_addPacket" : None,
"getUUID" : None,
......@@ -583,7 +568,7 @@ class MasterServerElectionTests(NeoTestBase):
def test_14_handleNotifyNodeInformation(self):
election = self.election
uuid = self.identifyToMasterNode(port=self.master_port)
packet = Packet(msg_type=NOTIFY_NODE_INFORMATION)
packet = Packet(msg_type=PacketTypes.NOTIFY_NODE_INFORMATION)
# do not answer if no uuid
conn = Mock({"getUUID" : None,
"getAddress" : ("127.0.0.1", self.master_port)})
......
......@@ -20,7 +20,7 @@ from mock import Mock
from neo.tests import NeoTestBase
from neo.master.app import Application
from neo.protocol import INVALID_PTID, INVALID_OID, INVALID_TID, \
INVALID_UUID, Packet, NOTIFY_NODE_INFORMATION
INVALID_UUID, Packet
from neo.storage.mysqldb import p64, u64
class MasterAppTests(NeoTestBase):
......
......@@ -21,25 +21,10 @@ from neo import logging
from mock import Mock
from struct import pack, unpack
from neo.tests import NeoTestBase
from neo import protocol
from neo.protocol import Packet, NodeTypes, NodeStates, CellStates, INVALID_UUID
from neo.protocol import Packet, PacketTypes
from neo.protocol import NodeTypes, NodeStates, CellStates
from neo.master.handlers.recovery import RecoveryHandler
from neo.master.app import Application
from neo.protocol import ERROR, REQUEST_NODE_IDENTIFICATION, ACCEPT_NODE_IDENTIFICATION, \
PING, PONG, ASK_PRIMARY_MASTER, ANSWER_PRIMARY_MASTER, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, START_OPERATION, \
STOP_OPERATION, ASK_LAST_IDS, ANSWER_LAST_IDS, ASK_PARTITION_TABLE, \
ANSWER_PARTITION_TABLE, SEND_PARTITION_TABLE, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ANSWER_UNFINISHED_TRANSACTIONS, \
ASK_OBJECT_PRESENT, ANSWER_OBJECT_PRESENT, \
DELETE_TRANSACTION, COMMIT_TRANSACTION, ASK_BEGIN_TRANSACTION, ANSWER_BEGIN_TRANSACTION, \
FINISH_TRANSACTION, NOTIFY_TRANSACTION_FINISHED, LOCK_INFORMATION, \
NOTIFY_INFORMATION_LOCKED, INVALIDATE_OBJECTS, UNLOCK_INFORMATION, \
ASK_NEW_OIDS, ANSWER_NEW_OIDS, ASK_STORE_OBJECT, ANSWER_STORE_OBJECT, \
ABORT_TRANSACTION, ASK_STORE_TRANSACTION, ANSWER_STORE_TRANSACTION, \
ASK_OBJECT, ANSWER_OBJECT, ASK_TIDS, ANSWER_TIDS, ASK_TRANSACTION_INFORMATION, \
ANSWER_TRANSACTION_INFORMATION, ASK_OBJECT_HISTORY, ANSWER_OBJECT_HISTORY, \
ASK_OIDS, ANSWER_OIDS
from neo.exception import OperationFailure, ElectionFailure
from neo.tests import DoNothingConnector
from neo.connection import ClientConnection
......@@ -114,7 +99,7 @@ class MasterRecoveryTests(NeoTestBase):
def test_08_handleNotifyNodeInformation(self):
recovery = self.recovery
uuid = self.identifyToMasterNode(NodeTypes.MASTER, port=self.master_port)
packet = Packet(msg_type=NOTIFY_NODE_INFORMATION)
packet = Packet(msg_type=PacketTypes.NOTIFY_NODE_INFORMATION)
# tell about a client node, do nothing
conn = self.getFakeConnection(uuid, self.master_address)
node_list = [(NodeTypes.CLIENT, '127.0.0.1', self.client_port,
......@@ -172,7 +157,7 @@ class MasterRecoveryTests(NeoTestBase):
def test_09_handleAnswerLastIDs(self):
recovery = self.recovery
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ANSWER_LAST_IDS)
packet = Packet(msg_type=PacketTypes.ANSWER_LAST_IDS)
loid = self.app.loid
ltid = self.app.ltid
lptid = self.app.pt.getID()
......@@ -199,7 +184,7 @@ class MasterRecoveryTests(NeoTestBase):
def test_10_handleAnswerPartitionTable(self):
recovery = self.recovery
uuid = self.identifyToMasterNode(NodeTypes.MASTER, port=self.master_port)
packet = Packet(msg_type=ANSWER_PARTITION_TABLE)
packet = Packet(msg_type=PacketTypes.ANSWER_PARTITION_TABLE)
# not from target node, ignore
uuid = self.identifyToMasterNode(NodeTypes.STORAGE, port=self.storage_port)
conn = self.getFakeConnection(uuid, self.storage_port)
......
......@@ -23,14 +23,10 @@ from struct import pack, unpack
from neo.tests import NeoTestBase
import neo.master
from neo import protocol
from neo.protocol import Packet, NodeTypes, NodeStates, CellStates, INVALID_UUID
from neo.protocol import Packet, PacketTypes
from neo.protocol import NodeTypes, NodeStates, CellStates
from neo.master.handlers.storage import StorageServiceHandler
from neo.master.app import Application
from neo.protocol import ERROR, PING, PONG, ANNOUNCE_PRIMARY_MASTER, \
REELECT_PRIMARY_MASTER, NOTIFY_NODE_INFORMATION, \
ASK_LAST_IDS, ANSWER_LAST_IDS, NOTIFY_PARTITION_CHANGES, \
ASK_UNFINISHED_TRANSACTIONS, ASK_BEGIN_TRANSACTION, FINISH_TRANSACTION, \
NOTIFY_INFORMATION_LOCKED, ASK_NEW_OIDS, ABORT_TRANSACTION
from neo.exception import OperationFailure, ElectionFailure
class MasterStorageHandlerTests(NeoTestBase):
......@@ -72,7 +68,7 @@ class MasterStorageHandlerTests(NeoTestBase):
def test_05_handleNotifyNodeInformation(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=NOTIFY_NODE_INFORMATION)
packet = Packet(msg_type=PacketTypes.NOTIFY_NODE_INFORMATION)
# tell the master node that is not running any longer, it must raises
conn = self.getFakeConnection(uuid, self.storage_address)
node_list = [(NodeTypes.MASTER, '127.0.0.1', self.master_port,
......@@ -142,7 +138,7 @@ class MasterStorageHandlerTests(NeoTestBase):
def test_06_handleAnswerLastIDs(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ANSWER_LAST_IDS)
packet = Packet(msg_type=PacketTypes.ANSWER_LAST_IDS)
loid = self.app.loid
ltid = self.app.ltid
lptid = self.app.pt.getID()
......@@ -160,7 +156,7 @@ class MasterStorageHandlerTests(NeoTestBase):
def test_10_handleNotifyInformationLocked(self):
service = self.service
uuid = self.identifyToMasterNode(port=10020)
packet = Packet(msg_type=NOTIFY_INFORMATION_LOCKED)
packet = Packet(msg_type=PacketTypes.NOTIFY_INFORMATION_LOCKED)
# give an older tid than the PMN known, must abort
conn = self.getFakeConnection(uuid, self.storage_address)
oid_list = []
......@@ -203,7 +199,7 @@ class MasterStorageHandlerTests(NeoTestBase):
def test_12_handleAskLastIDs(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ASK_LAST_IDS)
packet = Packet(msg_type=PacketTypes.ASK_LAST_IDS)
# give a uuid
conn = self.getFakeConnection(uuid, self.storage_address)
ptid = self.app.pt.getID()
......@@ -220,7 +216,7 @@ class MasterStorageHandlerTests(NeoTestBase):
def test_13_handleAskUnfinishedTransactions(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ASK_UNFINISHED_TRANSACTIONS)
packet = Packet(msg_type=PacketTypes.ASK_UNFINISHED_TRANSACTIONS)
# give a uuid
conn = self.getFakeConnection(uuid, self.storage_address)
service.handleAskUnfinishedTransactions(conn, packet)
......@@ -244,7 +240,7 @@ class MasterStorageHandlerTests(NeoTestBase):
def test_14_handleNotifyPartitionChanges(self):
service = self.service
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=NOTIFY_PARTITION_CHANGES)
packet = Packet(msg_type=PacketTypes.NOTIFY_PARTITION_CHANGES)
# do not answer if not a storage node
client_uuid = self.identifyToMasterNode(node_type=NodeTypes.CLIENT,
port=self.client_port)
......
......@@ -22,14 +22,10 @@ from mock import Mock
from struct import pack, unpack
import neo
from neo.tests import NeoTestBase
from neo.protocol import Packet, NodeTypes, NodeStates, CellStates, ErrorCodes
from neo.protocol import Packet, PacketTypes
from neo.protocol import NodeTypes, NodeStates, CellStates, ErrorCodes
from neo.master.handlers.verification import VerificationHandler
from neo.master.app import Application
from neo import protocol
from neo.protocol import ERROR, ANNOUNCE_PRIMARY_MASTER, INVALID_UUID, \
NOTIFY_NODE_INFORMATION, ANSWER_LAST_IDS, ANSWER_PARTITION_TABLE, \
ANSWER_UNFINISHED_TRANSACTIONS, ANSWER_OBJECT_PRESENT, \
ANSWER_TRANSACTION_INFORMATION
from neo.exception import OperationFailure, ElectionFailure, VerificationFailure
from neo.tests import DoNothingConnector
from neo.connection import ClientConnection
......@@ -137,7 +133,7 @@ class MasterVerificationTests(NeoTestBase):
def test_09_handleAnswerLastIDs(self):
verification = self.verification
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ANSWER_LAST_IDS)
packet = Packet(msg_type=PacketTypes.ANSWER_LAST_IDS)
loid = self.app.loid
ltid = self.app.ltid
lptid = '\0' * 8
......@@ -161,7 +157,7 @@ class MasterVerificationTests(NeoTestBase):
def test_11_handleAnswerUnfinishedTransactions(self):
verification = self.verification
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ANSWER_UNFINISHED_TRANSACTIONS)
packet = Packet(msg_type=PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS)
# do nothing
conn = self.getFakeConnection(uuid, self.storage_address)
self.assertEquals(len(self.app.asking_uuid_dict), 0)
......@@ -188,7 +184,7 @@ class MasterVerificationTests(NeoTestBase):
def test_12_handleAnswerTransactionInformation(self):
verification = self.verification
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ANSWER_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ANSWER_TRANSACTION_INFORMATION)
# do nothing, as unfinished_oid_set is None
conn = self.getFakeConnection(uuid, self.storage_address)
self.assertEquals(len(self.app.asking_uuid_dict), 0)
......@@ -260,7 +256,7 @@ class MasterVerificationTests(NeoTestBase):
def test_14_handleAnswerObjectPresent(self):
verification = self.verification
uuid = self.identifyToMasterNode()
packet = Packet(msg_type=ANSWER_OBJECT_PRESENT)
packet = Packet(msg_type=PacketTypes.ANSWER_OBJECT_PRESENT)
# do nothing as asking_uuid_dict is True
upper, lower = unpack('!LL', self.app.ltid)
new_tid = pack('!LL', upper, lower + 10)
......
......@@ -27,8 +27,8 @@ from neo.storage.handlers.client import TransactionInformation
from neo.storage.handlers.client import ClientOperationHandler
from neo.exception import PrimaryFailure, OperationFailure
from neo.pt import PartitionTable
from neo import protocol
from neo.protocol import *
from neo.protocol import PacketTypes
class StorageClientHandlerTests(NeoTestBase):
......@@ -107,14 +107,14 @@ class StorageClientHandlerTests(NeoTestBase):
def test_18_handleAskTransactionInformation1(self):
# transaction does not exists
conn = Mock({ })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
self.operation.handleAskTransactionInformation(conn, packet, INVALID_TID)
self.checkErrorPacket(conn)
def test_18_handleAskTransactionInformation2(self):
# answer
conn = Mock({ })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
dm = Mock({ "getTransaction": (INVALID_TID, 'user', 'desc', '', ), })
self.app.dm = dm
self.operation.handleAskTransactionInformation(conn, packet, INVALID_TID)
......@@ -124,7 +124,7 @@ class StorageClientHandlerTests(NeoTestBase):
# delayed response
conn = Mock({})
self.app.dm = Mock()
packet = Packet(msg_type=ASK_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT)
self.app.load_lock_dict[INVALID_OID] = object()
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
......@@ -139,7 +139,7 @@ class StorageClientHandlerTests(NeoTestBase):
# invalid serial / tid / packet not found
self.app.dm = Mock({'getObject': None})
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
......@@ -155,7 +155,7 @@ class StorageClientHandlerTests(NeoTestBase):
# object found => answer
self.app.dm = Mock({'getObject': ('', '', 0, 0, '', )})
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
......@@ -170,7 +170,7 @@ class StorageClientHandlerTests(NeoTestBase):
app.pt = Mock()
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
packet = Packet(msg_type=PacketTypes.ASK_TIDS)
self.checkProtocolErrorRaised(self.operation.handleAskTIDs, conn, packet, 1, 1, None)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getTIDList')), 0)
......@@ -178,7 +178,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_25_handleAskTIDs2(self):
# well case => answer
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
packet = Packet(msg_type=PacketTypes.ASK_TIDS)
self.app.pt = Mock({'getPartitions': 1})
self.app.dm = Mock({'getTIDList': (INVALID_TID, )})
self.operation.handleAskTIDs(conn, packet, 1, 2, 1)
......@@ -190,7 +190,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_25_handleAskTIDs3(self):
# invalid partition => answer usable partitions
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
packet = Packet(msg_type=PacketTypes.ASK_TIDS)
cell = Mock({'getUUID':self.app.uuid})
self.app.dm = Mock({'getTIDList': (INVALID_TID, )})
self.app.pt = Mock({'getCellList': (cell, ), 'getPartitions': 1})
......@@ -206,13 +206,13 @@ class StorageClientHandlerTests(NeoTestBase):
app = self.app
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT_HISTORY)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT_HISTORY)
self.checkProtocolErrorRaised(self.operation.handleAskObjectHistory, conn, packet, 1, 1, None)
self.assertEquals(len(app.dm.mockGetNamedCalls('getObjectHistory')), 0)
def test_26_handleAskObjectHistory2(self):
# first case: empty history
packet = Packet(msg_type=ASK_OBJECT_HISTORY)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT_HISTORY)
conn = Mock({})
self.app.dm = Mock({'getObjectHistory': None})
self.operation.handleAskObjectHistory(conn, packet, INVALID_OID, 1, 2)
......@@ -225,7 +225,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_27_handleAskStoreTransaction2(self):
# add transaction entry
packet = Packet(msg_type=ASK_STORE_TRANSACTION)
packet = Packet(msg_type=PacketTypes.ASK_STORE_TRANSACTION)
conn = Mock({'getUUID': self.getNewUUID()})
self.operation.handleAskStoreTransaction(conn, packet,
INVALID_TID, '', '', '', ())
......@@ -237,7 +237,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_28_handleAskStoreObject2(self):
# locked => delayed response
packet = Packet(msg_type=ASK_STORE_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_STORE_OBJECT)
conn = Mock({'getUUID': self.app.uuid})
oid = '\x02' * 8
tid1, tid2 = self.getTwoIDs()
......@@ -254,7 +254,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_28_handleAskStoreObject3(self):
# locked => unresolvable conflict => answer
packet = Packet(msg_type=ASK_STORE_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_STORE_OBJECT)
conn = Mock({'getUUID': self.app.uuid})
tid1, tid2 = self.getTwoIDs()
self.app.store_lock_dict[INVALID_OID] = tid2
......@@ -268,7 +268,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_28_handleAskStoreObject4(self):
# resolvable conflict => answer
packet = Packet(msg_type=ASK_STORE_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_STORE_OBJECT)
conn = Mock({'getUUID': self.app.uuid})
self.app.dm = Mock({'getObjectHistory':((self.getNewUUID(), ), )})
self.assertEquals(self.app.store_lock_dict.get(INVALID_OID, None), None)
......@@ -282,7 +282,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_28_handleAskStoreObject5(self):
# no conflict => answer
packet = Packet(msg_type=ASK_STORE_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_STORE_OBJECT)
conn = Mock({'getUUID': self.app.uuid})
self.operation.handleAskStoreObject(conn, packet, INVALID_OID,
INVALID_SERIAL, 0, 0, '', INVALID_TID)
......@@ -297,7 +297,7 @@ class StorageClientHandlerTests(NeoTestBase):
def test_29_handleAbortTransaction(self):
# remove transaction
packet = Packet(msg_type=ABORT_TRANSACTION)
packet = Packet(msg_type=PacketTypes.ABORT_TRANSACTION)
conn = Mock({'getUUID': self.app.uuid})
transaction = Mock({ 'getObjectList': ((0, ), ), })
self.called = False
......
......@@ -24,14 +24,7 @@ from neo import protocol
from neo.pt import PartitionTable
from neo.storage.app import Application
from neo.storage.handlers.initialization import InitializationHandler
from neo.protocol import Packet, CellStates, INVALID_UUID, INVALID_TID
from neo.protocol import ACCEPT_NODE_IDENTIFICATION, REQUEST_NODE_IDENTIFICATION, \
NOTIFY_PARTITION_CHANGES, STOP_OPERATION, ASK_LAST_IDS, ASK_PARTITION_TABLE, \
ANSWER_OBJECT_PRESENT, ASK_OBJECT_PRESENT, LOCK_INFORMATION, \
UNLOCK_INFORMATION, ASK_TRANSACTION_INFORMATION, \
COMMIT_TRANSACTION, ASK_UNFINISHED_TRANSACTIONS, SEND_PARTITION_TABLE
from neo.protocol import ERROR, ASK_PRIMARY_MASTER
from neo.protocol import ANSWER_PRIMARY_MASTER
from neo.protocol import Packet, PacketTypes, CellStates
from neo.exception import PrimaryFailure, OperationFailure
from neo.storage.mysqldb import MySQLDatabaseManager, p64, u64
......@@ -91,7 +84,7 @@ class StorageInitializationHandlerTests(NeoTestBase):
self.checkNoPacketSent(conn)
def test_09_handleSendPartitionTable(self):
packet = Packet(msg_type=SEND_PARTITION_TABLE)
packet = Packet(msg_type=PacketTypes.SEND_PARTITION_TABLE)
uuid = self.getNewUUID()
# send a table
conn = Mock({"getUUID" : uuid,
......
......@@ -26,7 +26,7 @@ from neo.storage.app import Application
from neo.storage.handlers.master import MasterOperationHandler
from neo.exception import PrimaryFailure, OperationFailure
from neo.pt import PartitionTable
from neo.protocol import CellStates
from neo.protocol import CellStates, PacketTypes
from neo.protocol import *
class StorageMasterHandlerTests(NeoTestBase):
......@@ -100,7 +100,7 @@ class StorageMasterHandlerTests(NeoTestBase):
"getAddress" : ("127.0.0.1", self.master_port),
})
app.replicator = Mock({})
packet = Packet(msg_type=NOTIFY_PARTITION_CHANGES)
packet = Packet(msg_type=PacketTypes.NOTIFY_PARTITION_CHANGES)
self.app.pt = Mock({'getID': 1})
count = len(self.app.nm.getList())
self.operation.handleNotifyPartitionChanges(conn, packet, 0, ())
......@@ -124,7 +124,7 @@ class StorageMasterHandlerTests(NeoTestBase):
"isServer": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
packet = Packet(msg_type=NOTIFY_PARTITION_CHANGES)
packet = Packet(msg_type=PacketTypes.NOTIFY_PARTITION_CHANGES)
app = self.app
# register nodes
app.nm.createStorage(uuid=uuid1)
......@@ -147,14 +147,14 @@ class StorageMasterHandlerTests(NeoTestBase):
def test_16_handleStopOperation1(self):
# OperationFailure
conn = Mock({ 'isServer': False })
packet = Packet(msg_type=STOP_OPERATION)
packet = Packet(msg_type=PacketTypes.STOP_OPERATION)
self.assertRaises(OperationFailure, self.operation.handleStopOperation, conn, packet)
def test_22_handleLockInformation2(self):
# load transaction informations
conn = Mock({ 'isServer': False, })
self.app.dm = Mock({ })
packet = Packet(msg_type=LOCK_INFORMATION)
packet = Packet(msg_type=PacketTypes.LOCK_INFORMATION)
packet.setId(1)
transaction = Mock({ 'getObjectList': ((0, ), ), })
self.app.transaction_dict[INVALID_TID] = transaction
......@@ -173,7 +173,7 @@ class StorageMasterHandlerTests(NeoTestBase):
# delete transaction informations
conn = Mock({ 'isServer': False, })
self.app.dm = Mock({ })
packet = Packet(msg_type=LOCK_INFORMATION)
packet = Packet(msg_type=PacketTypes.LOCK_INFORMATION)
packet.setId(1)
transaction = Mock({ 'getObjectList': ((0, ), ), })
self.app.transaction_dict[INVALID_TID] = transaction
......@@ -195,7 +195,7 @@ class StorageMasterHandlerTests(NeoTestBase):
def test_30_handleAnswerLastIDs(self):
# set critical TID on replicator
conn = Mock()
packet = Packet(msg_type=ANSWER_LAST_IDS)
packet = Packet(msg_type=PacketTypes.ANSWER_LAST_IDS)
self.app.replicator = Mock()
self.operation.handleAnswerLastIDs(
conn=conn,
......@@ -211,7 +211,7 @@ class StorageMasterHandlerTests(NeoTestBase):
def test_31_handleAnswerUnfinishedTransactions(self):
# set unfinished TID on replicator
conn = Mock()
packet = Packet(msg_type=ANSWER_UNFINISHED_TRANSACTIONS)
packet = Packet(msg_type=PacketTypes.ANSWER_UNFINISHED_TRANSACTIONS)
self.app.replicator = Mock()
self.operation.handleAnswerUnfinishedTransactions(
conn=conn,
......
......@@ -19,8 +19,8 @@ import unittest, logging, os
from mock import Mock
from neo.tests import NeoTestBase
from neo.storage.app import Application
from neo.protocol import CellStates, INVALID_PTID, INVALID_TID, \
INVALID_UUID, Packet, NOTIFY_NODE_INFORMATION
from neo.protocol import Packet, CellStates, PacketTypes, \
INVALID_PTID, INVALID_TID, INVALID_UUID
from neo.storage.mysqldb import p64, u64, MySQLDatabaseManager
from collections import deque
from neo.pt import PartitionTable
......
......@@ -24,8 +24,8 @@ from collections import deque
from neo.tests import NeoTestBase
from neo.storage.app import Application
from neo.storage.handlers.storage import StorageOperationHandler
from neo import protocol
from neo.protocol import *
from neo.protocol import PacketTypes
class StorageStorageHandlerTests(NeoTestBase):
......@@ -65,14 +65,14 @@ class StorageStorageHandlerTests(NeoTestBase):
def test_18_handleAskTransactionInformation1(self):
# transaction does not exists
conn = Mock({ })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
self.operation.handleAskTransactionInformation(conn, packet, INVALID_TID)
self.checkErrorPacket(conn)
def test_18_handleAskTransactionInformation2(self):
# answer
conn = Mock({ })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
dm = Mock({ "getTransaction": (INVALID_TID, 'user', 'desc', '', ), })
self.app.dm = dm
self.operation.handleAskTransactionInformation(conn, packet, INVALID_TID)
......@@ -82,7 +82,7 @@ class StorageStorageHandlerTests(NeoTestBase):
# delayed response
conn = Mock({})
self.app.dm = Mock()
packet = Packet(msg_type=ASK_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT)
self.app.load_lock_dict[INVALID_OID] = object()
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
......@@ -97,7 +97,7 @@ class StorageStorageHandlerTests(NeoTestBase):
# invalid serial / tid / packet not found
self.app.dm = Mock({'getObject': None})
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
......@@ -113,7 +113,7 @@ class StorageStorageHandlerTests(NeoTestBase):
# object found => answer
self.app.dm = Mock({'getObject': ('', '', 0, 0, '', )})
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT)
self.assertEquals(len(self.app.event_queue), 0)
self.operation.handleAskObject(conn, packet,
oid=INVALID_OID,
......@@ -128,7 +128,7 @@ class StorageStorageHandlerTests(NeoTestBase):
app.pt = Mock()
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
packet = Packet(msg_type=PacketTypes.ASK_TIDS)
self.checkProtocolErrorRaised(self.operation.handleAskTIDs, conn, packet, 1, 1, None)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getTIDList')), 0)
......@@ -136,7 +136,7 @@ class StorageStorageHandlerTests(NeoTestBase):
def test_25_handleAskTIDs2(self):
# well case => answer
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
packet = Packet(msg_type=PacketTypes.ASK_TIDS)
self.app.dm = Mock({'getTIDList': (INVALID_TID, )})
self.app.pt = Mock({'getPartitions': 1})
self.operation.handleAskTIDs(conn, packet, 1, 2, 1)
......@@ -148,7 +148,7 @@ class StorageStorageHandlerTests(NeoTestBase):
def test_25_handleAskTIDs3(self):
# invalid partition => answer usable partitions
conn = Mock({})
packet = Packet(msg_type=ASK_TIDS)
packet = Packet(msg_type=PacketTypes.ASK_TIDS)
cell = Mock({'getUUID':self.app.uuid})
self.app.dm = Mock({'getTIDList': (INVALID_TID, )})
self.app.pt = Mock({'getCellList': (cell, ), 'getPartitions': 1})
......@@ -164,13 +164,13 @@ class StorageStorageHandlerTests(NeoTestBase):
app = self.app
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_OBJECT_HISTORY)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT_HISTORY)
self.checkProtocolErrorRaised(self.operation.handleAskObjectHistory, conn, packet, 1, 1, None)
self.assertEquals(len(app.dm.mockGetNamedCalls('getObjectHistory')), 0)
def test_26_handleAskObjectHistory2(self):
# first case: empty history
packet = Packet(msg_type=ASK_OBJECT_HISTORY)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT_HISTORY)
conn = Mock({})
self.app.dm = Mock({'getObjectHistory': None})
self.operation.handleAskObjectHistory(conn, packet, INVALID_OID, 1, 2)
......@@ -187,7 +187,7 @@ class StorageStorageHandlerTests(NeoTestBase):
app.pt = Mock()
app.dm = Mock()
conn = Mock({})
packet = Packet(msg_type=ASK_OIDS)
packet = Packet(msg_type=PacketTypes.ASK_OIDS)
self.checkProtocolErrorRaised(self.operation.handleAskOIDs, conn, packet, 1, 1, None)
self.assertEquals(len(app.pt.mockGetNamedCalls('getCellList')), 0)
self.assertEquals(len(app.dm.mockGetNamedCalls('getOIDList')), 0)
......@@ -195,7 +195,7 @@ class StorageStorageHandlerTests(NeoTestBase):
def test_25_handleAskOIDs2(self):
# well case > answer OIDs
conn = Mock({})
packet = Packet(msg_type=ASK_OIDS)
packet = Packet(msg_type=PacketTypes.ASK_OIDS)
self.app.pt = Mock({'getPartitions': 1})
self.app.dm = Mock({'getOIDList': (INVALID_OID, )})
self.operation.handleAskOIDs(conn, packet, 1, 2, 1)
......@@ -207,7 +207,7 @@ class StorageStorageHandlerTests(NeoTestBase):
def test_25_handleAskOIDs3(self):
# invalid partition => answer usable partitions
conn = Mock({})
packet = Packet(msg_type=ASK_OIDS)
packet = Packet(msg_type=PacketTypes.ASK_OIDS)
cell = Mock({'getUUID':self.app.uuid})
self.app.dm = Mock({'getOIDList': (INVALID_OID, )})
self.app.pt = Mock({'getCellList': (cell, ), 'getPartitions': 1})
......
......@@ -20,14 +20,11 @@ import unittest
from neo import logging
from mock import Mock
from neo.tests import NeoTestBase
from neo import protocol
from neo.pt import PartitionTable
from neo.storage.app import Application
from neo.storage.handlers.verification import VerificationHandler
from neo.protocol import Packet, CellStates, NodeTypes, ErrorCodes
from neo.protocol import NOTIFY_PARTITION_CHANGES, STOP_OPERATION, \
ASK_OBJECT_PRESENT, ASK_TRANSACTION_INFORMATION, COMMIT_TRANSACTION, \
ASK_UNFINISHED_TRANSACTIONS, INVALID_OID, INVALID_TID
from neo.protocol import Packet, PacketTypes, CellStates, NodeTypes, ErrorCodes
from neo.protocol import INVALID_OID, INVALID_TID
from neo.exception import PrimaryFailure, OperationFailure
from neo.storage.mysqldb import MySQLDatabaseManager, p64, u64
......@@ -177,7 +174,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
"isServer": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
packet = Packet(msg_type=NOTIFY_PARTITION_CHANGES)
packet = Packet(msg_type=PacketTypes.NOTIFY_PARTITION_CHANGES)
self.verification.handleNotifyPartitionChanges(conn, packet, 1, ())
self.verification.handleNotifyPartitionChanges(conn, packet, 0, ())
self.assertEqual(self.app.pt.getID(), 1)
......@@ -187,7 +184,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
"isServer": False,
"getAddress" : ("127.0.0.1", self.master_port),
})
packet = Packet(msg_type=NOTIFY_PARTITION_CHANGES)
packet = Packet(msg_type=PacketTypes.NOTIFY_PARTITION_CHANGES)
new_uuid = self.getNewUUID()
cell = (0, new_uuid, CellStates.UP_TO_DATE)
self.app.nm.createStorage(uuid=new_uuid)
......@@ -206,21 +203,21 @@ class StorageVerificationHandlerTests(NeoTestBase):
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False })
self.assertFalse(self.app.operational)
packet = Packet(msg_type=STOP_OPERATION)
packet = Packet(msg_type=PacketTypes.STOP_OPERATION)
self.verification.handleStartOperation(conn, packet)
self.assertTrue(self.app.operational)
def test_12_handleStopOperation(self):
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False })
packet = Packet(msg_type=STOP_OPERATION)
packet = Packet(msg_type=PacketTypes.STOP_OPERATION)
self.assertRaises(OperationFailure, self.verification.handleStopOperation, conn, packet)
def test_13_handleAskUnfinishedTransactions(self):
# client connection with no data
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False})
packet = Packet(msg_type=ASK_UNFINISHED_TRANSACTIONS)
packet = Packet(msg_type=PacketTypes.ASK_UNFINISHED_TRANSACTIONS)
self.verification.handleAskUnfinishedTransactions(conn, packet)
(tid_list, ) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
self.assertEqual(len(tid_list), 0)
......@@ -232,7 +229,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
self.app.dm.commit()
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False})
packet = Packet(msg_type=ASK_UNFINISHED_TRANSACTIONS)
packet = Packet(msg_type=PacketTypes.ASK_UNFINISHED_TRANSACTIONS)
self.verification.handleAskUnfinishedTransactions(conn, packet)
(tid_list, ) = self.checkAnswerUnfinishedTransactions(conn, decode=True)
self.assertEqual(len(tid_list), 1)
......@@ -242,7 +239,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
# ask from client conn with no data
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
self.verification.handleAskTransactionInformation(conn, packet, p64(1))
code, message = self.checkErrorPacket(conn, decode=True)
self.assertEqual(code, ErrorCodes.TID_NOT_FOUND)
......@@ -257,7 +254,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
# object from trans
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
self.verification.handleAskTransactionInformation(conn, packet, p64(1))
tid, user, desc, ext, oid_list = self.checkAnswerTransactionInformation(conn, decode=True)
self.assertEqual(u64(tid), 1)
......@@ -269,7 +266,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
# object from ttrans
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
self.verification.handleAskTransactionInformation(conn, packet, p64(3))
tid, user, desc, ext, oid_list = self.checkAnswerTransactionInformation(conn, decode=True)
self.assertEqual(u64(tid), 3)
......@@ -283,7 +280,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': True })
# find the one in trans
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
self.verification.handleAskTransactionInformation(conn, packet, p64(1))
tid, user, desc, ext, oid_list = self.checkAnswerTransactionInformation(conn, decode=True)
self.assertEqual(u64(tid), 1)
......@@ -295,7 +292,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
# do not find the one in ttrans
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': True })
packet = Packet(msg_type=ASK_TRANSACTION_INFORMATION)
packet = Packet(msg_type=PacketTypes.ASK_TRANSACTION_INFORMATION)
self.verification.handleAskTransactionInformation(conn, packet, p64(2))
code, message = self.checkErrorPacket(conn, decode=True)
self.assertEqual(code, ErrorCodes.TID_NOT_FOUND)
......@@ -304,7 +301,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
# client connection with no data
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False})
packet = Packet(msg_type=ASK_OBJECT_PRESENT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT_PRESENT)
self.verification.handleAskObjectPresent(conn, packet, p64(1), p64(2))
code, message = self.checkErrorPacket(conn, decode=True)
self.assertEqual(code, ErrorCodes.OID_NOT_FOUND)
......@@ -316,7 +313,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
self.app.dm.commit()
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False})
packet = Packet(msg_type=ASK_OBJECT_PRESENT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT_PRESENT)
self.verification.handleAskObjectPresent(conn, packet, p64(1), p64(2))
oid, tid = self.checkAnswerObjectPresent(conn, decode=True)
self.assertEqual(u64(tid), 2)
......@@ -326,7 +323,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
# client connection with no data
conn = Mock({ "getAddress" : ("127.0.0.1", self.master_port),
'isServer': False})
packet = Packet(msg_type=ASK_OBJECT_PRESENT)
packet = Packet(msg_type=PacketTypes.ASK_OBJECT_PRESENT)
self.verification.handleDeleteTransaction(conn, packet, p64(1))
# client connection with data
self.app.dm.begin()
......@@ -343,7 +340,7 @@ class StorageVerificationHandlerTests(NeoTestBase):
'isServer': False })
dm = Mock()
self.app.dm = dm
packet = Packet(msg_type=COMMIT_TRANSACTION)
packet = Packet(msg_type=PacketTypes.COMMIT_TRANSACTION)
self.verification.handleCommitTransaction(conn, packet, p64(1))
self.assertEqual(len(dm.mockGetNamedCalls("finishTransaction")), 1)
call = dm.mockGetNamedCalls("finishTransaction")[0]
......
......@@ -25,7 +25,7 @@ from neo.handler import EventHandler
from neo.tests import DoNothingConnector
from neo.connector import ConnectorException, ConnectorTryAgainException, \
ConnectorInProgressException, ConnectorConnectionRefusedException
from neo.protocol import Packet, ProtocolError, ANSWER_PRIMARY_MASTER
from neo.protocol import Packet, ProtocolError, PacketTypes
from neo import protocol
from neo.tests import NeoTestBase
......@@ -721,7 +721,7 @@ class ConnectionTests(NeoTestBase):
self.assertEquals(len(bc._queue.mockGetNamedCalls("append")), 1)
call = bc._queue.mockGetNamedCalls("append")[0]
data = call.getParam(0)
self.assertEqual(data.getType(), ANSWER_PRIMARY_MASTER)
self.assertEqual(data.getType(), PacketTypes.ANSWER_PRIMARY_MASTER)
self.assertEqual(data.getId(), 1)
self.assertEqual(len(bc.event_dict), 0)
self.assertEqual(bc.read_buf, '')
......
......@@ -20,7 +20,6 @@ from time import time
from neo.tests import NeoTestBase
from neo.epoll import Epoll
from neo.event import EpollEventManager, IdleEvent
from neo.protocol import Packet, PING
class EventTests(NeoTestBase):
......
......@@ -19,7 +19,8 @@ import unittest, os
from mock import Mock
from neo import protocol
from neo.protocol import *
from neo.protocol import NodeTypes, NodeStates, CellStates, ErrorCodes
from neo.protocol import NodeTypes, NodeStates, CellStates
from neo.protocol import ErrorCodes, PacketTypes
from neo.tests import NeoTestBase
from neo.util import getNextTID
from time import time, gmtime
......@@ -37,8 +38,8 @@ class ProtocolTests(NeoTestBase):
return self.ltid
def test_01_Packet_init(self):
p = Packet(msg_type=ASK_PRIMARY_MASTER, body=None)
self.assertEqual(p.getType(), ASK_PRIMARY_MASTER)
p = Packet(msg_type=PacketTypes.ASK_PRIMARY_MASTER, body=None)
self.assertEqual(p.getType(), PacketTypes.ASK_PRIMARY_MASTER)
self.assertEqual(len(p), PACKET_HEADER_SIZE)
def test_02_error(self):
......@@ -252,7 +253,7 @@ class ProtocolTests(NeoTestBase):
def test_30_deleteTransaction(self):
tid = self.getNextTID()
p = protocol.deleteTransaction(tid)
self.assertEqual(p.getType(), DELETE_TRANSACTION)
self.assertEqual(p.getType(), PacketTypes.DELETE_TRANSACTION)
ptid = p.decode()[0]
self.assertEqual(ptid, tid)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment