Commit 1be82862 authored by Grégory Wisniewski's avatar Grégory Wisniewski

Move packet decoding method at module level instead of Packet class as it was

done for encoding.


git-svn-id: https://svn.erp5.org/repos/neo/branches/prototype3@500 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d9b88d8b
......@@ -209,7 +209,7 @@ class Connection(BaseConnection):
while 1:
packet = None
try:
packet = Packet.parse(self.read_buf)
packet = protocol.parse(self.read_buf)
except PacketMalformedError, msg:
self.handler.packetMalformed(self, packet, msg)
return
......
......@@ -321,6 +321,8 @@ UUID_NAMESPACES = {
class ProtocolError(Exception): pass
class PacketMalformedError(ProtocolError): pass
decode_table = {}
class Packet(object):
"""A packet."""
......@@ -328,27 +330,6 @@ class Packet(object):
_type = None
_len = None
@classmethod
def parse(cls, msg):
# logging.debug('parsing %s', dump(msg))
if len(msg) < MIN_PACKET_SIZE:
return None
msg_id, msg_type, msg_len = unpack('!LHL', msg[:PACKET_HEADER_SIZE])
try:
msg_type = packet_types[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
if msg_len < MIN_PACKET_SIZE:
raise PacketMalformedError('message too small (%d)' % msg_len)
if len(msg) < msg_len:
# Not enough.
return None
packet = cls(msg_type, msg[PACKET_HEADER_SIZE:msg_len])
packet.setId(msg_id)
return packet
def __init__(self, msg_type, body=''):
self._id = None
self._type = msg_type
......@@ -369,7 +350,6 @@ class Packet(object):
except TypeError:
return PACKET_HEADER_SIZE
# Encoders.
def encode(self):
msg = pack('!LHL', self._id, self._type, PACKET_HEADER_SIZE + len(self._body)) + self._body
if len(msg) > MAX_PACKET_SIZE:
......@@ -377,502 +357,520 @@ class Packet(object):
return msg
__str__ = encode
# Decoders.
def decode(self):
try:
method = self.decode_table[self._type]
method = decode_table[self._type]
except KeyError:
raise PacketMalformedError('unknown message type 0x%x' % self._type)
return method(self)
decode_table = {}
def _decodeError(self):
try:
body = self._body
code, size = unpack('!HL', body[:6])
message = body[6:]
except struct.error, msg:
raise PacketMalformedError('invalid error message')
if len(message) != size:
raise PacketMalformedError('invalid error message size')
return code, message
decode_table[ERROR] = _decodeError
def _decodePing(self):
pass
decode_table[PING] = _decodePing
def _decodePong(self):
pass
decode_table[PONG] = _decodePong
def _decodeRequestNodeIdentification(self):
try:
body = self._body
major, minor, node_type, uuid, ip_address, port, size \
= unpack('!LLH16s4sHL', body[:36])
return method(self._body)
# packet parser
def parse(msg):
# logging.debug('parsing %s', dump(msg))
if len(msg) < MIN_PACKET_SIZE:
return None
msg_id, msg_type, msg_len = unpack('!LHL', msg[:PACKET_HEADER_SIZE])
try:
msg_type = packet_types[msg_type]
except KeyError:
raise PacketMalformedError('Unknown packet type')
if msg_len > MAX_PACKET_SIZE:
raise PacketMalformedError('message too big (%d)' % msg_len)
if msg_len < MIN_PACKET_SIZE:
raise PacketMalformedError('message too small (%d)' % msg_len)
if len(msg) < msg_len:
# Not enough.
return None
packet = Packet(msg_type, msg[PACKET_HEADER_SIZE:msg_len])
packet.setId(msg_id)
return packet
# packet decoding
def _decodeError(body):
try:
body = body
code, size = unpack('!HL', body[:6])
message = body[6:]
except struct.error, msg:
raise PacketMalformedError('invalid error message')
if len(message) != size:
raise PacketMalformedError('invalid error message size')
return code, message
decode_table[ERROR] = _decodeError
def _decodePing(body):
pass
decode_table[PING] = _decodePing
def _decodePong(body):
pass
decode_table[PONG] = _decodePong
def _decodeRequestNodeIdentification(body):
try:
body = body
major, minor, node_type, uuid, ip_address, port, size \
= unpack('!LLH16s4sHL', body[:36])
ip_address = inet_ntoa(ip_address)
name = body[36:]
except struct.error, msg:
raise PacketMalformedError('invalid request node identification')
if size != len(name):
raise PacketMalformedError('invalid name size')
if node_type not in VALID_NODE_TYPE_LIST:
raise PacketMalformedError('invalid node type %d' % node_type)
if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch')
return node_type, uuid, ip_address, port, name
decode_table[REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification
def _decodeAcceptNodeIdentification(body):
try:
node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid \
= unpack('!H16s4sHLL16s', body)
ip_address = inet_ntoa(ip_address)
except struct.error, msg:
raise PacketMalformedError('invalid accept node identification')
if node_type not in VALID_NODE_TYPE_LIST:
raise PacketMalformedError('invalid node type %d' % node_type)
return node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
def _decodeAskPrimaryMaster(body):
pass
decode_table[ASK_PRIMARY_MASTER] = _decodeAskPrimaryMaster
def _decodeAnswerPrimaryMaster(body):
try:
primary_uuid, n = unpack('!16sL', body[:20])
known_master_list = []
for i in xrange(n):
ip_address, port, uuid = unpack('!4sH16s', body[20+i*22:42+i*22])
ip_address = inet_ntoa(ip_address)
name = body[36:]
except struct.error, msg:
raise PacketMalformedError('invalid request node identification')
if size != len(name):
raise PacketMalformedError('invalid name size')
if node_type not in VALID_NODE_TYPE_LIST:
raise PacketMalformedError('invalid node type %d' % node_type)
if (major, minor) != PROTOCOL_VERSION:
raise PacketMalformedError('protocol version mismatch')
return node_type, uuid, ip_address, port, name
decode_table[REQUEST_NODE_IDENTIFICATION] = _decodeRequestNodeIdentification
def _decodeAcceptNodeIdentification(self):
try:
node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid \
= unpack('!H16s4sHLL16s', self._body)
known_master_list.append((ip_address, port, uuid))
except struct.error, msg:
raise PacketMalformedError('invalid answer primary master')
return primary_uuid, known_master_list
decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
def _decodeAnnouncePrimaryMaster(body):
pass
decode_table[ANNOUNCE_PRIMARY_MASTER] = _decodeAnnouncePrimaryMaster
def _decodeReelectPrimaryMaster(body):
pass
decode_table[REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
def _decodeNotifyNodeInformation(body):
try:
n = unpack('!L', body[:4])[0]
node_list = []
for i in xrange(n):
r = unpack('!H4sH16sH', body[4+i*26:30+i*26])
node_type, ip_address, port, uuid, state = r
ip_address = inet_ntoa(ip_address)
except struct.error, msg:
raise PacketMalformedError('invalid accept node identification')
if node_type not in VALID_NODE_TYPE_LIST:
raise PacketMalformedError('invalid node type %d' % node_type)
return node_type, uuid, ip_address, port, num_partitions, num_replicas, your_uuid
decode_table[ACCEPT_NODE_IDENTIFICATION] = _decodeAcceptNodeIdentification
def _decodeAskPrimaryMaster(self):
pass
decode_table[ASK_PRIMARY_MASTER] = _decodeAskPrimaryMaster
def _decodeAnswerPrimaryMaster(self):
try:
primary_uuid, n = unpack('!16sL', self._body[:20])
known_master_list = []
for i in xrange(n):
ip_address, port, uuid = unpack('!4sH16s', self._body[20+i*22:42+i*22])
ip_address = inet_ntoa(ip_address)
known_master_list.append((ip_address, port, uuid))
except struct.error, msg:
raise PacketMalformedError('invalid answer primary master')
return primary_uuid, known_master_list
decode_table[ANSWER_PRIMARY_MASTER] = _decodeAnswerPrimaryMaster
def _decodeAnnouncePrimaryMaster(self):
pass
decode_table[ANNOUNCE_PRIMARY_MASTER] = _decodeAnnouncePrimaryMaster
def _decodeReelectPrimaryMaster(self):
pass
decode_table[REELECT_PRIMARY_MASTER] = _decodeReelectPrimaryMaster
def _decodeNotifyNodeInformation(self):
try:
n = unpack('!L', self._body[:4])[0]
node_list = []
for i in xrange(n):
r = unpack('!H4sH16sH', self._body[4+i*26:30+i*26])
node_type, ip_address, port, uuid, state = r
ip_address = inet_ntoa(ip_address)
if node_type not in VALID_NODE_TYPE_LIST:
raise PacketMalformedError('invalid node type %d' % node_type)
state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST:
raise PacketMalformedError('invalid node state %d' % state)
node_list.append((node_type, ip_address, port, uuid, state))
except PacketMalformedError:
raise
except struct.error, msg:
raise PacketMalformedError('invalid answer node information')
return (node_list,)
decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
def _decodeAskLastIDs(self):
pass
decode_table[ASK_LAST_IDS] = _decodeAskLastIDs
def _decodeAnswerLastIDs(self):
try:
loid, ltid, lptid = unpack('!8s8s8s', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid answer last ids')
return loid, ltid, lptid
decode_table[ANSWER_LAST_IDS] = _decodeAnswerLastIDs
def _decodeAskPartitionTable(self):
try:
n = unpack('!L', self._body[:4])[0]
offset_list = []
for i in xrange(n):
offset = unpack('!L', self._body[4+i*4:8+i*4])[0]
offset_list.append(offset)
except struct.error, msg:
raise PacketMalformedError('invalid ask partition table')
return (offset_list,)
decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable
def _decodeAnswerPartitionTable(self):
try:
ptid, n = unpack('!8sL', self._body[:12])
index = 12
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', self._body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', self._body[index:index+18])
index += 18
state = partition_cell_states.get(state)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
except struct.error, msg:
raise PacketMalformedError('invalid answer partition table')
return ptid, row_list
decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
def _decodeSendPartitionTable(self):
try:
ptid, n = unpack('!8sL', self._body[:12])
index = 12
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', self._body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', self._body[index:index+18])
index += 18
state = partition_cell_states.get(state)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
except struct.error, msg:
raise PacketMalformedError('invalid send partition table')
return ptid, row_list
decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable
def _decodeNotifyPartitionChanges(self):
try:
ptid, n = unpack('!8sL', self._body[:12])
cell_list = []
for i in xrange(n):
offset, uuid, state = unpack('!L16sH', self._body[12+i*22:34+i*22])
if node_type not in VALID_NODE_TYPE_LIST:
raise PacketMalformedError('invalid node type %d' % node_type)
state = node_states.get(state)
if state not in VALID_NODE_STATE_LIST:
raise PacketMalformedError('invalid node state %d' % state)
node_list.append((node_type, ip_address, port, uuid, state))
except PacketMalformedError:
raise
except struct.error, msg:
raise PacketMalformedError('invalid answer node information')
return (node_list,)
decode_table[NOTIFY_NODE_INFORMATION] = _decodeNotifyNodeInformation
def _decodeAskLastIDs(body):
pass
decode_table[ASK_LAST_IDS] = _decodeAskLastIDs
def _decodeAnswerLastIDs(body):
try:
loid, ltid, lptid = unpack('!8s8s8s', body)
except struct.error, msg:
raise PacketMalformedError('invalid answer last ids')
return loid, ltid, lptid
decode_table[ANSWER_LAST_IDS] = _decodeAnswerLastIDs
def _decodeAskPartitionTable(body):
try:
n = unpack('!L', body[:4])[0]
offset_list = []
for i in xrange(n):
offset = unpack('!L', body[4+i*4:8+i*4])[0]
offset_list.append(offset)
except struct.error, msg:
raise PacketMalformedError('invalid ask partition table')
return (offset_list,)
decode_table[ASK_PARTITION_TABLE] = _decodeAskPartitionTable
def _decodeAnswerPartitionTable(body):
try:
ptid, n = unpack('!8sL', body[:12])
index = 12
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = partition_cell_states.get(state)
cell_list.append((offset, uuid, state))
except struct.error, msg:
raise PacketMalformedError('invalid notify partition changes')
return ptid, cell_list
decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
def _decodeStartOperation(self):
pass
decode_table[START_OPERATION] = _decodeStartOperation
def _decodeStopOperation(self):
pass
decode_table[STOP_OPERATION] = _decodeStopOperation
def _decodeAskUnfinishedTransactions(self):
pass
decode_table[ASK_UNFINISHED_TRANSACTIONS] = _decodeAskUnfinishedTransactions
def _decodeAnswerUnfinishedTransactions(self):
try:
n = unpack('!L', self._body[:4])[0]
tid_list = []
for i in xrange(n):
tid = unpack('8s', self._body[4+i*8:12+i*8])[0]
tid_list.append(tid)
except struct.error, msg:
raise PacketMalformedError('invalid answer unfinished transactions')
return (tid_list,)
decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions
def _decodeAskObjectPresent(self):
try:
oid, tid = unpack('8s8s', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid ask object present')
return oid, tid
decode_table[ASK_OBJECT_PRESENT] = _decodeAskObjectPresent
def _decodeAnswerObjectPresent(self):
try:
oid, tid = unpack('8s8s', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid answer object present')
return oid, tid
decode_table[ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent
def _decodeDeleteTransaction(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid delete transaction')
return (tid,)
decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction
def _decodeCommitTransaction(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid commit transaction')
return (tid,)
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
def _decodeAskNewTID(self):
pass
decode_table[ASK_NEW_TID] = _decodeAskNewTID
def _decodeAnswerNewTID(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid answer new tid')
return (tid,)
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
def _decodeAskNewOIDs(self):
try:
num_oids = unpack('!H', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid ask new oids')
return (num_oids,)
decode_table[ASK_NEW_OIDS] = _decodeAskNewOIDs
def _decodeAnswerNewOIDs(self):
try:
n = unpack('!H', self._body[:2])[0]
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[2+i*8:10+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid answer new oids')
return (oid_list,)
decode_table[ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
def _decodeFinishTransaction(self):
try:
tid, n = unpack('!8sL', self._body[:12])
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[12+i*8:20+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid finish transaction')
return oid_list, tid
decode_table[FINISH_TRANSACTION] = _decodeFinishTransaction
def _decodeNotifyTransactionFinished(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid notify transactin finished')
return (tid,)
decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
def _decodeLockInformation(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid lock information')
return (tid,)
decode_table[LOCK_INFORMATION] = _decodeLockInformation
def _decodeNotifyInformationLocked(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid notify information locked')
return (tid,)
decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
def _decodeInvalidateObjects(self):
try:
tid, n = unpack('!8sL', self._body[:12])
oid_list = []
for i in xrange(12, 12 + n * 8, 8):
oid = unpack('8s', self._body[i:i+8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid finish transaction')
return oid_list, tid
decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects
def _decodeUnlockInformation(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid unlock information')
return (tid,)
decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation
def _decodeAbortTransaction(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid abort transaction')
return (tid,)
decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction
def _decodeAskStoreObject(self):
try:
oid, serial, tid, compression, checksum, data_len \
= unpack('!8s8s8sBLL', self._body[:33])
data = self._body[33:]
except struct.error, msg:
raise PacketMalformedError('invalid ask store object')
if data_len != len(data):
raise PacketMalformedError('invalid data size')
return oid, serial, compression, checksum, data, tid
decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject
def _decodeAnswerStoreObject(self):
try:
conflicting, oid, serial = unpack('!B8s8s', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid answer store object')
return conflicting, oid, serial
decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
def _decodeAskStoreTransaction(self):
try:
tid, oid_len, user_len, desc_len, ext_len \
= unpack('!8sLHHH', self._body[:18])
offset = 18
user = self._body[offset:offset+user_len]
offset += user_len
desc = self._body[offset:offset+desc_len]
offset += desc_len
ext = self._body[offset:offset+ext_len]
offset += ext_len
oid_list = []
for i in xrange(oid_len):
oid = unpack('8s', self._body[offset:offset+8])[0]
offset += 8
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid ask store transaction')
return tid, user, desc, ext, oid_list
decode_table[ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
def _decodeAnswerStoreTransaction(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid answer store transaction')
return (tid,)
decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
def _decodeAskObject(self):
try:
oid, serial, tid = unpack('8s8s8s', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid ask object')
return oid, serial, tid
decode_table[ASK_OBJECT] = _decodeAskObject
def _decodeAnswerObject(self):
try:
oid, serial_start, serial_end, compression, checksum, data_len \
= unpack('!8s8s8sBLL', self._body[:33])
data = self._body[33:]
except struct.error, msg:
raise PacketMalformedError('invalid answer object')
if len(data) != data_len:
raise PacketMalformedError('invalid data size')
return oid, serial_start, serial_end, compression, checksum, data
decode_table[ANSWER_OBJECT] = _decodeAnswerObject
def _decodeAskTIDs(self):
try:
first, last, partition = unpack('!QQL', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid ask tids')
return first, last, partition
decode_table[ASK_TIDS] = _decodeAskTIDs
def _decodeAnswerTIDs(self):
try:
n = unpack('!L', self._body[:4])[0]
tid_list = []
for i in xrange(n):
tid = unpack('8s', self._body[4+i*8:12+i*8])[0]
tid_list.append(tid)
except struct.error, msg:
raise PacketMalformedError('invalid answer tids')
return (tid_list,)
decode_table[ANSWER_TIDS] = _decodeAnswerTIDs
def _decodeAskTransactionInformation(self):
try:
tid = unpack('8s', self._body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid ask transaction information')
return (tid,)
decode_table[ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation
def _decodeAnswerTransactionInformation(self):
try:
tid, user_len, desc_len, ext_len, oid_len \
= unpack('!8sHHHL', self._body[:18])
offset = 18
user = self._body[offset:offset+user_len]
offset += user_len
desc = self._body[offset:offset+desc_len]
offset += desc_len
ext = self._body[offset:offset+ext_len]
offset += ext_len
oid_list = []
for i in xrange(oid_len):
oid = unpack('8s', self._body[offset+i*8:offset+8+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid answer transaction information')
return tid, user, desc, ext, oid_list
decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
def _decodeAskObjectHistory(self):
try:
oid, first, last = unpack('!8sQQ', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid ask object history')
return oid, first, last
decode_table[ASK_OBJECT_HISTORY] = _decodeAskObjectHistory
def _decodeAnswerObjectHistory(self):
try:
oid, length = unpack('!8sL', self._body[:12])
history_list = []
for i in xrange(12, 12 + length * 12, 12):
serial, size = unpack('!8sL', self._body[i:i+12])
history_list.append((serial, size))
except struct.error, msg:
raise PacketMalformedError('invalid answer object history')
return oid, history_list
decode_table[ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory
def _decodeAskOIDs(self):
try:
first, last, partition = unpack('!QQL', self._body)
except struct.error, msg:
raise PacketMalformedError('invalid ask oids')
return first, last, partition
decode_table[ASK_OIDS] = _decodeAskOIDs
def _decodeAnswerOIDs(self):
try:
n = unpack('!L', self._body[:4])[0]
oid_list = []
for i in xrange(n):
oid = unpack('8s', self._body[4+i*8:12+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid answer oids')
return (oid_list,)
decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
# Packet constructors
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
except struct.error, msg:
raise PacketMalformedError('invalid answer partition table')
return ptid, row_list
decode_table[ANSWER_PARTITION_TABLE] = _decodeAnswerPartitionTable
def _decodeSendPartitionTable(body):
try:
ptid, n = unpack('!8sL', body[:12])
index = 12
row_list = []
cell_list = []
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
state = partition_cell_states.get(state)
cell_list.append((uuid, state))
row_list.append((offset, tuple(cell_list)))
del cell_list[:]
except struct.error, msg:
raise PacketMalformedError('invalid send partition table')
return ptid, row_list
decode_table[SEND_PARTITION_TABLE] = _decodeSendPartitionTable
def _decodeNotifyPartitionChanges(body):
try:
ptid, n = unpack('!8sL', body[:12])
cell_list = []
for i in xrange(n):
offset, uuid, state = unpack('!L16sH', body[12+i*22:34+i*22])
state = partition_cell_states.get(state)
cell_list.append((offset, uuid, state))
except struct.error, msg:
raise PacketMalformedError('invalid notify partition changes')
return ptid, cell_list
decode_table[NOTIFY_PARTITION_CHANGES] = _decodeNotifyPartitionChanges
def _decodeStartOperation(body):
pass
decode_table[START_OPERATION] = _decodeStartOperation
def _decodeStopOperation(body):
pass
decode_table[STOP_OPERATION] = _decodeStopOperation
def _decodeAskUnfinishedTransactions(body):
pass
decode_table[ASK_UNFINISHED_TRANSACTIONS] = _decodeAskUnfinishedTransactions
def _decodeAnswerUnfinishedTransactions(body):
try:
n = unpack('!L', body[:4])[0]
tid_list = []
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
except struct.error, msg:
raise PacketMalformedError('invalid answer unfinished transactions')
return (tid_list,)
decode_table[ANSWER_UNFINISHED_TRANSACTIONS] = _decodeAnswerUnfinishedTransactions
def _decodeAskObjectPresent(body):
try:
oid, tid = unpack('8s8s', body)
except struct.error, msg:
raise PacketMalformedError('invalid ask object present')
return oid, tid
decode_table[ASK_OBJECT_PRESENT] = _decodeAskObjectPresent
def _decodeAnswerObjectPresent(body):
try:
oid, tid = unpack('8s8s', body)
except struct.error, msg:
raise PacketMalformedError('invalid answer object present')
return oid, tid
decode_table[ANSWER_OBJECT_PRESENT] = _decodeAnswerObjectPresent
def _decodeDeleteTransaction(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid delete transaction')
return (tid,)
decode_table[DELETE_TRANSACTION] = _decodeDeleteTransaction
def _decodeCommitTransaction(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid commit transaction')
return (tid,)
decode_table[COMMIT_TRANSACTION] = _decodeCommitTransaction
def _decodeAskNewTID(body):
pass
decode_table[ASK_NEW_TID] = _decodeAskNewTID
def _decodeAnswerNewTID(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid answer new tid')
return (tid,)
decode_table[ANSWER_NEW_TID] = _decodeAnswerNewTID
def _decodeAskNewOIDs(body):
try:
num_oids = unpack('!H', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid ask new oids')
return (num_oids,)
decode_table[ASK_NEW_OIDS] = _decodeAskNewOIDs
def _decodeAnswerNewOIDs(body):
try:
n = unpack('!H', body[:2])[0]
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[2+i*8:10+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid answer new oids')
return (oid_list,)
decode_table[ANSWER_NEW_OIDS] = _decodeAnswerNewOIDs
def _decodeFinishTransaction(body):
try:
tid, n = unpack('!8sL', body[:12])
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[12+i*8:20+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid finish transaction')
return oid_list, tid
decode_table[FINISH_TRANSACTION] = _decodeFinishTransaction
def _decodeNotifyTransactionFinished(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid notify transactin finished')
return (tid,)
decode_table[NOTIFY_TRANSACTION_FINISHED] = _decodeNotifyTransactionFinished
def _decodeLockInformation(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid lock information')
return (tid,)
decode_table[LOCK_INFORMATION] = _decodeLockInformation
def _decodeNotifyInformationLocked(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid notify information locked')
return (tid,)
decode_table[NOTIFY_INFORMATION_LOCKED] = _decodeNotifyInformationLocked
def _decodeInvalidateObjects(body):
try:
tid, n = unpack('!8sL', body[:12])
oid_list = []
for i in xrange(12, 12 + n * 8, 8):
oid = unpack('8s', body[i:i+8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid finish transaction')
return oid_list, tid
decode_table[INVALIDATE_OBJECTS] = _decodeInvalidateObjects
def _decodeUnlockInformation(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid unlock information')
return (tid,)
decode_table[UNLOCK_INFORMATION] = _decodeUnlockInformation
def _decodeAbortTransaction(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid abort transaction')
return (tid,)
decode_table[ABORT_TRANSACTION] = _decodeAbortTransaction
def _decodeAskStoreObject(body):
try:
oid, serial, tid, compression, checksum, data_len \
= unpack('!8s8s8sBLL', body[:33])
data = body[33:]
except struct.error, msg:
raise PacketMalformedError('invalid ask store object')
if data_len != len(data):
raise PacketMalformedError('invalid data size')
return oid, serial, compression, checksum, data, tid
decode_table[ASK_STORE_OBJECT] = _decodeAskStoreObject
def _decodeAnswerStoreObject(body):
try:
conflicting, oid, serial = unpack('!B8s8s', body)
except struct.error, msg:
raise PacketMalformedError('invalid answer store object')
return conflicting, oid, serial
decode_table[ANSWER_STORE_OBJECT] = _decodeAnswerStoreObject
def _decodeAskStoreTransaction(body):
try:
tid, oid_len, user_len, desc_len, ext_len \
= unpack('!8sLHHH', body[:18])
offset = 18
user = body[offset:offset+user_len]
offset += user_len
desc = body[offset:offset+desc_len]
offset += desc_len
ext = body[offset:offset+ext_len]
offset += ext_len
oid_list = []
for i in xrange(oid_len):
oid = unpack('8s', body[offset:offset+8])[0]
offset += 8
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid ask store transaction')
return tid, user, desc, ext, oid_list
decode_table[ASK_STORE_TRANSACTION] = _decodeAskStoreTransaction
def _decodeAnswerStoreTransaction(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid answer store transaction')
return (tid,)
decode_table[ANSWER_STORE_TRANSACTION] = _decodeAnswerStoreTransaction
def _decodeAskObject(body):
try:
oid, serial, tid = unpack('8s8s8s', body)
except struct.error, msg:
raise PacketMalformedError('invalid ask object')
return oid, serial, tid
decode_table[ASK_OBJECT] = _decodeAskObject
def _decodeAnswerObject(body):
try:
oid, serial_start, serial_end, compression, checksum, data_len \
= unpack('!8s8s8sBLL', body[:33])
data = body[33:]
except struct.error, msg:
raise PacketMalformedError('invalid answer object')
if len(data) != data_len:
raise PacketMalformedError('invalid data size')
return oid, serial_start, serial_end, compression, checksum, data
decode_table[ANSWER_OBJECT] = _decodeAnswerObject
def _decodeAskTIDs(body):
try:
first, last, partition = unpack('!QQL', body)
except struct.error, msg:
raise PacketMalformedError('invalid ask tids')
return first, last, partition
decode_table[ASK_TIDS] = _decodeAskTIDs
def _decodeAnswerTIDs(body):
try:
n = unpack('!L', body[:4])[0]
tid_list = []
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
tid_list.append(tid)
except struct.error, msg:
raise PacketMalformedError('invalid answer tids')
return (tid_list,)
decode_table[ANSWER_TIDS] = _decodeAnswerTIDs
def _decodeAskTransactionInformation(body):
try:
tid = unpack('8s', body)[0]
except struct.error, msg:
raise PacketMalformedError('invalid ask transaction information')
return (tid,)
decode_table[ASK_TRANSACTION_INFORMATION] = _decodeAskTransactionInformation
def _decodeAnswerTransactionInformation(body):
try:
tid, user_len, desc_len, ext_len, oid_len \
= unpack('!8sHHHL', body[:18])
offset = 18
user = body[offset:offset+user_len]
offset += user_len
desc = body[offset:offset+desc_len]
offset += desc_len
ext = body[offset:offset+ext_len]
offset += ext_len
oid_list = []
for i in xrange(oid_len):
oid = unpack('8s', body[offset+i*8:offset+8+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid answer transaction information')
return tid, user, desc, ext, oid_list
decode_table[ANSWER_TRANSACTION_INFORMATION] = _decodeAnswerTransactionInformation
def _decodeAskObjectHistory(body):
try:
oid, first, last = unpack('!8sQQ', body)
except struct.error, msg:
raise PacketMalformedError('invalid ask object history')
return oid, first, last
decode_table[ASK_OBJECT_HISTORY] = _decodeAskObjectHistory
def _decodeAnswerObjectHistory(body):
try:
oid, length = unpack('!8sL', body[:12])
history_list = []
for i in xrange(12, 12 + length * 12, 12):
serial, size = unpack('!8sL', body[i:i+12])
history_list.append((serial, size))
except struct.error, msg:
raise PacketMalformedError('invalid answer object history')
return oid, history_list
decode_table[ANSWER_OBJECT_HISTORY] = _decodeAnswerObjectHistory
def _decodeAskOIDs(body):
try:
first, last, partition = unpack('!QQL', body)
except struct.error, msg:
raise PacketMalformedError('invalid ask oids')
return first, last, partition
decode_table[ASK_OIDS] = _decodeAskOIDs
def _decodeAnswerOIDs(body):
try:
n = unpack('!L', body[:4])[0]
oid_list = []
for i in xrange(n):
oid = unpack('8s', body[4+i*8:12+i*8])[0]
oid_list.append(oid)
except struct.error, msg:
raise PacketMalformedError('invalid answer oids')
return (oid_list,)
decode_table[ANSWER_OIDS] = _decodeAnswerOIDs
# Packet encoding
def _error(error_code, error_message):
body = pack('!HL', error_code, len(error_message)) + error_message
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment