Commit 9ab70be0 authored by Vincent Pelletier's avatar Vincent Pelletier

Factorise struct declarations.

Reduces the amount of duplicated (or deductible) information in packet
generators and parsers.

git-svn-id: 71dcc9de-d417-0410-9af5-da40c76e7ee4
parent d798a8ec
......@@ -15,7 +15,7 @@
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
from struct import pack, unpack, error
from struct import pack, unpack, error, calcsize
from socket import inet_ntoa, inet_aton
from neo.util import Enum
......@@ -200,6 +200,8 @@ def _readString(buf, name, offset=0):
raise PacketMalformedError("can't read string <%s>" % name)
return (string, buf[offset+size:])
def _writeString(buf):
return pack('!L', len(buf)) + buf
class Packet(object):
......@@ -208,6 +210,8 @@ class Packet(object):
a tuple respectively.
_header_format = None
_header_len = None
_request = None
_answer = None
_body = None
......@@ -311,17 +315,20 @@ class RequestIdentification(Packet):
Request a node identification. This must be the first packet for any
connection. Any -> Any.
_header_format = '!LLH16s6s'
def _encode(self, node_type, uuid, address, name):
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
node_type, uuid, address, len(name)) + name
return pack(self._header_format, PROTOCOL_VERSION[0],
PROTOCOL_VERSION[1], node_type, uuid, address) + \
def _decode(self, body):
r = unpack('!LLH16s6s', body[:32])
r = unpack(self._header_format, body[:self._header_len])
major, minor, node_type, uuid, address = r
address = _decodeAddress(address)
(name, _) = _readString(body, 'name', offset=32)
(name, _) = _readString(body, 'name', offset=self._header_len)
node_type = _decodeNodeType(node_type)
uuid = _decodeUUID(uuid)
if (major, minor) != PROTOCOL_VERSION:
......@@ -333,15 +340,17 @@ class AcceptIdentification(Packet):
Accept a node identification. This should be a reply to Request Node
Identification. Any -> Any.
_header_format = '!H16sLL16s'
def _encode(self, node_type, uuid,
num_partitions, num_replicas, your_uuid):
uuid = _encodeUUID(uuid)
your_uuid = _encodeUUID(your_uuid)
return pack('!H16sLL16s', node_type, uuid,
return pack(self._header_format, node_type, uuid,
num_partitions, num_replicas, your_uuid)
def _decode(self, body):
r = unpack('!H16sLL16s', body)
r = unpack(self._header_format, body)
node_type, uuid, num_partitions, num_replicas, your_uuid = r
node_type = _decodeNodeType(node_type)
uuid = _decodeUUID(uuid)
......@@ -360,20 +369,32 @@ class AnswerPrimary(Packet):
Reply to Ask Primary Master. This message includes a list of known master
nodes to make sure that a peer has the same information. M -> Any.
_header_format = '!16sL'
_list_entry_format = '!6s16s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, primary_uuid, known_master_list):
primary_uuid = _encodeUUID(primary_uuid)
body = [primary_uuid, pack('!L', len(known_master_list))]
body = [pack(self._header_format, primary_uuid,
for address, uuid in known_master_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!6s16s', address, uuid))
body.append(pack(self._list_entry_format, address, uuid))
return ''.join(body)
def _decode(self, body):
(primary_uuid, n) = unpack('!16sL', body[:20])
packet_offset = self._header_len
(primary_uuid, n) = unpack(self._header_format,
known_master_list = []
list_entry_len = self._list_entry_len
list_entry_format = self._list_entry_format
for i in xrange(n):
address, uuid = unpack('!6s16s', body[20+i*22:42+i*22])
next_packet_offset = packet_offset + list_entry_len
address, uuid = unpack(list_entry_format,
packet_offset = next_packet_offset
address = _decodeAddress(address)
uuid = _decodeUUID(uuid)
known_master_list.append((address, uuid))
......@@ -423,17 +444,28 @@ class AskPartitionTable(Packet):
Ask rows in a partition table that a storage node stores. Used to recover
information. PM -> S.
_header_format = '!L'
_list_entry_format = '!L'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, offset_list):
body = [pack('!L', len(offset_list))]
body = [pack(self._header_format, len(offset_list))]
list_entry_format = self._list_entry_format
for offset in offset_list:
body.append(pack('!L', offset))
body.append(pack(list_entry_format, offset))
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
packet_offset = self._header_len
(n,) = unpack(self._header_format, body[:packet_offset])
offset_list = []
list_entry_len = self._list_entry_len
list_entry_format = self._list_entry_format
for i in xrange(n):
offset = unpack('!L', body[4+i*4:8+i*4])[0]
next_packet_offset = packet_offset + list_entry_len
offset = unpack(list_entry_format,
packet_offset = next_packet_offset
return (offset_list,)
......@@ -441,28 +473,42 @@ class AnswerPartitionTable(Packet):
Answer rows in a partition table. S -> PM.
_header_format = '!8sL'
_row_entry_format = '!LL'
_row_entry_len = calcsize(_row_entry_format)
_cell_entry_format = '!16sH'
_cell_entry_len = calcsize(_cell_entry_format)
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
body = [pack(self._header_format, ptid, len(row_list))]
row_entry_format = self._row_entry_format
cell_entry_format = self._cell_entry_format
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
body.append(pack(row_entry_format, offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body.append(pack(cell_entry_format, uuid, state))
return ''.join(body)
def _decode(self, body):
index = 12
(ptid, n) = unpack('!8sL', body[:index])
index = self._header_len
(ptid, n) = unpack(self._header_format, body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
row_entry_format = self._row_entry_format
row_entry_len = self._row_entry_len
cell_entry_format = self._cell_entry_format
cell_entry_len = self._cell_entry_len
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
next_index = index + row_entry_len
offset, m = unpack(row_entry_format, body[index:next_index])
index = next_index
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
next_index = index + cell_entry_len
uuid, state = unpack(cell_entry_format, body[index:next_index])
index = next_index
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
......@@ -474,28 +520,42 @@ class SendPartitionTable(Packet):
Send rows in a partition table to update other nodes. PM -> S, C.
_header_format = '!8sL'
_row_entry_format = '!LL'
_row_entry_len = calcsize(_row_entry_format)
_cell_entry_format = '!16sH'
_cell_entry_len = calcsize(_cell_entry_format)
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
body = [pack(self._header_format, ptid, len(row_list))]
row_entry_format = self._row_entry_format
cell_entry_format = self._cell_entry_format
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
body.append(pack(row_entry_format, offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body.append(pack(cell_entry_format, uuid, state))
return ''.join(body)
def _decode(self, body):
index = 12
(ptid, n,) = unpack('!8sL', body[:index])
index = self._header_len
(ptid, n,) = unpack(self._header_format, body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
row_entry_format = self._row_entry_format
row_entry_len = self._row_entry_len
cell_entry_format = self._cell_entry_format
cell_entry_len = self._cell_entry_len
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
next_index = index + row_entry_len
offset, m = unpack(row_entry_format, body[index:next_index])
index = next_index
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
next_index = index + cell_entry_len
uuid, state = unpack(cell_entry_format, body[index:next_index])
index = next_index
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
......@@ -508,20 +568,31 @@ class NotifyPartitionChanges(Packet):
Notify a subset of a partition table. This is used to notify changes.
PM -> S, C.
_header_format = '!8sL'
_list_entry_format = '!L16sH'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, ptid, cell_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(cell_list))]
body = [pack(self._header_format, ptid, len(cell_list))]
list_entry_format = self._list_entry_format
for offset, uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!L16sH', offset, uuid, state))
body.append(pack(list_entry_format, offset, uuid, state))
return ''.join(body)
def _decode(self, body):
(ptid, n) = unpack('!8sL', body[:12])
packet_offset = self._header_len
(ptid, n) = unpack(self._header_format, body[:packet_offset])
ptid = _decodePTID(ptid)
cell_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
(offset, uuid, state) = unpack('!L16sH', body[12+i*22:34+i*22])
next_packet_offset = packet_offset + list_entry_len
(offset, uuid, state) = unpack(list_entry_format,
packet_offset = next_packet_offset
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((offset, uuid, state))
......@@ -533,12 +604,13 @@ class NotifyReplicationDone(Packet):
a storage to another.
S -> M
_header_format = '!L'
def _encode(self, offset):
return pack('!L', offset)
return pack(self._header_format, offset)
def _decode(self, body):
(offset, ) = unpack('!L', body)
(offset, ) = unpack(self._header_format, body)
return (offset, )
class StartOperation(Packet):
......@@ -565,16 +637,25 @@ class AnswerUnfinishedTransactions(Packet):
Answer unfinished transactions S -> PM.
_header_format = '!L'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, tid_list):
body = [pack('!L', len(tid_list))]
body = [pack(self._header_format, len(tid_list))]
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
tid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
next_offset = offset + list_entry_len
tid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
return (tid_list,)
......@@ -643,16 +724,25 @@ class AskFinishTransaction(Packet):
Finish a transaction. C -> PM.
_header_format = '!8sL'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body = [pack(self._header_format, tid, len(oid_list))]
return ''.join(body)
def _decode(self, body):
(tid, n) = unpack('!8sL', body[:12])
offset = self._header_len
(tid, n) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
oid = unpack('8s', body[12+i*8:20+i*8])[0]
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
return (oid_list, tid)
......@@ -693,16 +783,25 @@ class InvalidateObjects(Packet):
Invalidate objects. PM -> C.
_header_format = '!8sL'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, oid_list, tid):
body = [pack('!8sL', tid, len(oid_list))]
body = [pack(self._header_format, tid, len(oid_list))]
return ''.join(body)
def _decode(self, body):
(tid, n) = unpack('!8sL', body[:12])
offset = self._header_len
(tid, n) = unpack(self._header_format, body[:offset])
oid_list = []
for i in xrange(12, 12 + n * 8, 8):
oid = unpack('8s', body[i:i+8])[0]
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
return (oid_list, tid)
......@@ -721,26 +820,37 @@ class AskNewOIDs(Packet):
Ask new object IDs. C -> PM.
_header_format = '!H'
def _encode(self, num_oids):
return pack('!H', num_oids)
return pack(self._header_format, num_oids)
def _decode(self, body):
return unpack('!H', body) # num oids
return unpack(self._header_format, body) # num oids
class AnswerNewOIDs(Packet):
Answer new object IDs. PM -> C.
_header_format = '!H'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, oid_list):
body = [pack('!H', len(oid_list))]
body = [pack(self._header_format, len(oid_list))]
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!H', body[:2])
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
oid = unpack('8s', body[2+i*8:10+i*8])[0]
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
return (oid_list,)
......@@ -749,16 +859,19 @@ class AskStoreObject(Packet):
Ask to store an object. Send an OID, an original serial, a current
transaction ID, and data. C -> S.
_header_format = '!8s8s8sBL'
def _encode(self, oid, serial, compression, checksum, data, tid):
if serial is None:
serial = INVALID_TID
return pack('!8s8s8sBLL', oid, serial, tid, compression,
checksum, len(data)) + data
return pack(self._header_format, oid, serial, tid, compression,
checksum) + _writeString(data)
def _decode(self, body):
r = unpack('!8s8s8sBL', body[:29])
header_len = self._header_len
r = unpack(self._header_format, body[:header_len])
oid, serial, tid, compression, checksum = r
(data, _) = _readString(body, 'data', offset=29)
(data, _) = _readString(body, 'data', offset=header_len)
return (oid, serial, compression, checksum, data, tid)
class AnswerStoreObject(Packet):
......@@ -768,13 +881,15 @@ class AnswerStoreObject(Packet):
if this serial is newer than the current transaction ID, a client
node must not try to resolve the conflict. S -> C.
_header_format = '!B8s8s'
def _encode(self, conflicting, oid, serial):
if serial is None:
serial = INVALID_TID
return pack('!B8s8s', conflicting, oid, serial)
return pack(self._header_format, conflicting, oid, serial)
def _decode(self, body):
(conflicting, oid, serial) = unpack('!B8s8s', body)
(conflicting, oid, serial) = unpack(self._header_format, body)
return (conflicting, oid, serial)
class AbortTransaction(Packet):
......@@ -789,9 +904,11 @@ class AskStoreTransaction(Packet):
Ask to store a transaction. C -> S.
_header_format = '!8sLHHH'
def _encode(self, tid, user, desc, ext, oid_list):
lengths = (len(oid_list), len(user), len(desc), len(ext))
body = [pack('!8sLHHH', tid, *lengths)]
body = [pack(self._header_format, tid, *lengths)]
......@@ -799,9 +916,9 @@ class AskStoreTransaction(Packet):
return ''.join(body)
def _decode(self, body):
r = unpack('!8sLHHH', body[:18])
r = unpack(self._header_format, body[:self._header_len])
tid, oid_len, user_len, desc_len, ext_len = r
body = body[18:]
body = body[self._header_len:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
......@@ -832,13 +949,15 @@ class AskObject(Packet):
is specified, the specified revision of an object will be returned. If
a TID is specified, an object right before the TID will be returned. S,C -> S.
_header_format = '!8s8s8s'
def _encode(self, oid, serial, tid):
tid = _encodeTID(tid)
serial = _encodeTID(serial) # serial is the previous TID
return pack('!8s8s8s', oid, serial, tid)
return pack(self._header_format, oid, serial, tid)
def _decode(self, body):
(oid, serial, tid) = unpack('8s8s8s', body)
(oid, serial, tid) = unpack(self._header_format, body)
if serial == INVALID_TID:
serial = None
tid = _decodeTID(tid)
......@@ -848,21 +967,24 @@ class AnswerObject(Packet):
Answer the requested object. S -> C.
_header_format = '!8s8s8sBL'
def _encode(self, oid, serial_start, serial_end, compression,
checksum, data):
if serial_start is None:
serial_start = INVALID_TID
if serial_end is None:
serial_end = INVALID_TID
return pack('!8s8s8sBLL', oid, serial_start, serial_end,
compression, checksum, len(data)) + data
return pack(self._header_format, oid, serial_start, serial_end,
compression, checksum) + _writeString(data)
def _decode(self, body):
r = unpack('!8s8s8sBL', body[:29])
header_len = self._header_len
r = unpack(self._header_format, body[:header_len])
oid, serial_start, serial_end, compression, checksum = r
if serial_end == INVALID_TID:
serial_end = None
(data, _) = _readString(body, 'data', offset=29)
(data, _) = _readString(body, 'data', offset=header_len)
return (oid, serial_start, serial_end, compression, checksum, data)
class AskTIDs(Packet):
......@@ -870,26 +992,37 @@ class AskTIDs(Packet):
Ask for TIDs between a range of offsets. The order of TIDs is descending,
and the range is [first, last). C, S -> S.
_header_format = '!QQL'
def _encode(self, first, last, partition):
return pack('!QQL', first, last, partition)
return pack(self._header_format, first, last, partition)
def _decode(self, body):
return unpack('!QQL', body) # first, last, partition
return unpack(self._header_format, body) # first, last, partition
class AnswerTIDs(Packet):
Answer the requested TIDs. S -> C, S.
_header_format = '!L'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, tid_list):
body = [pack('!L', len(tid_list))]
body = [pack(self._header_format, len(tid_list))]
return ''.join(body)
def _decode(self, body):
(n, ) = unpack('!L', body[:4])
offset = self._header_len
(n, ) = unpack(self._header_format, body[:offset])
tid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
tid = unpack('8s', body[4+i*8:12+i*8])[0]
next_offset = offset + list_entry_len
tid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
return (tid_list,)
......@@ -905,9 +1038,11 @@ class AnswerTransactionInformation(Packet):
Answer information (user, description) about a transaction. S -> Any.
_header_format = '!8sHHHBL'
def _encode(self, tid, user, desc, ext, packed, oid_list):
packed = packed and 1 or 0
body = [pack('!8sHHHBL', tid, len(user), len(desc), len(ext),
body = [pack(self._header_format, tid, len(user), len(desc), len(ext),
packed, len(oid_list))]
......@@ -916,10 +1051,10 @@ class AnswerTransactionInformation(Packet):
return ''.join(body)
def _decode(self, body):
r = unpack('!8sHHHBL', body[:19])
r = unpack(self._header_format, body[:self._header_len])
tid, user_len, desc_len, ext_len, packed, oid_len = r
packed = bool(packed)
body = body[19:]
body = body[self._header_len:]
user = body[:user_len]
body = body[user_len:]
desc = body[:desc_len]
......@@ -938,28 +1073,40 @@ class AskObjectHistory(Packet):
Ask history information for a given object. The order of serials is
descending, and the range is [first, last]. C, S -> S.
_header_format = '!8sQQ'
def _encode(self, oid, first, last):
return pack('!8sQQ', oid, first, last)
return pack(self._header_format, oid, first, last)
def _decode(self, body):
(oid, first, last) = unpack('!8sQQ', body)
(oid, first, last) = unpack(self._header_format, body)
return (oid, first, last)
class AnswerObjectHistory(Packet):
Answer history information (serial, size) for an object. S -> C, S.
_header_format = '!8sL'
_list_entry_format = '!8sL'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, oid, history_list):
body = [pack('!8sL', oid, len(history_list))]
body = [pack(self._header_format, oid, len(history_list))]
list_entry_format = self._list_entry_format
for serial, size in history_list:
body.append(pack('!8sL', serial, size))
body.append(pack(list_entry_format, serial, size))
return ''.join(body)
def _decode(self, body):
(oid, length) = unpack('!8sL', body[:12])
offset = self._header_len
(oid, length) = unpack(self._header_format, body[:offset])
history_list = []
for i in xrange(12, 12 + length * 12, 12):
serial, size = unpack('!8sL', body[i:i+12])
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(length):
next_offset = offset + list_entry_len
serial, size = unpack(list_entry_format, body[offset:next_offset])
offset = next_offset
history_list.append((serial, size))
return (oid, history_list)
......@@ -968,26 +1115,37 @@ class AskOIDs(Packet):
Ask for OIDs between a range of offsets. The order of OIDs is descending,
and the range is [first, last). S -> S.
_header_format = '!QQL'
def _encode(self, first, last, partition):
return pack('!QQL', first, last, partition)
return pack(self._header_format, first, last, partition)
def _decode(self, body):
return unpack('!QQL', body) # first, last, partition
return unpack(self._header_format, body) # first, last, partition
class AnswerOIDs(Packet):
Answer the requested OIDs. S -> S.
_header_format = '!L'
_list_entry_format = '8s'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, oid_list):
body = [pack('!L', len(oid_list))]
body = [pack(self._header_format, len(oid_list))]
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
oid_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
oid = unpack('8s', body[4+i*8:12+i*8])[0]
next_offset = offset + list_entry_len
oid = unpack(list_entry_format, body[offset:next_offset])[0]
offset = next_offset
return (oid_list,)
......@@ -996,13 +1154,15 @@ class AskPartitionList(Packet):
All the following messages are for neoctl to admin node
Ask information about partition
_header_format = '!LL16s'
def _encode(self, min_offset, max_offset, uuid):
uuid = _encodeUUID(uuid)
body = [pack('!LL16s', min_offset, max_offset, uuid)]
body = [pack(self._header_format, min_offset, max_offset, uuid)]
return ''.join(body)
def _decode(self, body):
(min_offset, max_offset, uuid) = unpack('!LL16s', body)
(min_offset, max_offset, uuid) = unpack(self._header_format, body)
uuid = _decodeUUID(uuid)
return (min_offset, max_offset, uuid)
......@@ -1010,28 +1170,42 @@ class AnswerPartitionList(Packet):
Answer information about partition
_header_format = '!8sL'
_row_entry_format = '!LL'
_row_entry_len = calcsize(_row_entry_format)
_cell_entry_format = '!16sH'
_cell_entry_len = calcsize(_cell_entry_format)
def _encode(self, ptid, row_list):
ptid = _encodePTID(ptid)
body = [pack('!8sL', ptid, len(row_list))]
body = [pack(self._header_format, ptid, len(row_list))]
row_entry_format = self._row_entry_format
cell_entry_format = self._cell_entry_format
for offset, cell_list in row_list:
body.append(pack('!LL', offset, len(cell_list)))
body.append(pack(row_entry_format, offset, len(cell_list)))
for uuid, state in cell_list:
uuid = _encodeUUID(uuid)
body.append(pack('!16sH', uuid, state))
body.append(pack(cell_entry_format, uuid, state))
return ''.join(body)
def _decode(self, body):
index = 12
(ptid, n) = unpack('!8sL', body[:index])
index = self._header_len
(ptid, n) = unpack(self._header_format, body[:index])
ptid = _decodePTID(ptid)
row_list = []
cell_list = []
row_entry_format = self._row_entry_format
row_entry_len = self._row_entry_len
cell_entry_format = self._cell_entry_format
cell_entry_len = self._cell_entry_len
for i in xrange(n):
offset, m = unpack('!LL', body[index:index+8])
index += 8
next_index = index + row_entry_len
offset, m = unpack(row_entry_format, body[index:next_index])
index = next_index
for j in xrange(m):
uuid, state = unpack('!16sH', body[index:index+18])
index += 18
next_index = index + cell_entry_len
uuid, state = unpack(cell_entry_format, body[index:next_index])
index = next_index
state = CellStates.get(state)
uuid = _decodeUUID(uuid)
cell_list.append((uuid, state))
......@@ -1043,11 +1217,13 @@ class AskNodeList(Packet):
Ask information about nodes
_header_format = '!H'
def _encode(self, node_type):
return ''.join([pack('!H', node_type)])
return ''.join([pack(self._header_format, node_type)])
def _decode(self, body):
(node_type, ) = unpack('!H', body)
(node_type, ) = unpack(self._header_format, body)
node_type = _decodeNodeType(node_type)
return (node_type,)
......@@ -1055,19 +1231,30 @@ class AnswerNodeList(Packet):
Answer information about nodes
_header_format = '!L'
_list_entry_format = '!H6s16sH'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, node_list):
body = [pack('!L', len(node_list))]
body = [pack(self._header_format, len(node_list))]
list_entry_format = self._list_entry_format
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body.append(pack(list_entry_format, node_type, address, uuid,
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
node_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
next_offset = offset + list_entry_len
r = unpack(list_entry_format, body[offset:next_offset])
offset = next_offset
node_type, address, uuid, state = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
......@@ -1080,12 +1267,15 @@ class SetNodeState(Packet):
Set the node state
_header_format = '!16sHB'
def _encode(self, uuid, state, modify_partition_table):
uuid = _encodeUUID(uuid)
return ''.join([pack('!16sHB', uuid, state, modify_partition_table)])
return ''.join([pack(self._header_format, uuid, state,
def _decode(self, body):
(uuid, state, modify) = unpack('!16sHB', body)
(uuid, state, modify) = unpack(self._header_format, body)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state, modify)
......@@ -1094,12 +1284,14 @@ class AnswerNodeState(Packet):
Answer state of the node
_header_format = '!16sH'
def _encode(self, uuid, state):
uuid = _encodeUUID(uuid)
return ''.join([pack('!16sH', uuid, state)])
return ''.join([pack(self._header_format, uuid, state)])
def _decode(self, body):
(uuid, state) = unpack('!16sH', body)
(uuid, state) = unpack(self._header_format, body)
state = _decodeNodeState(state)
uuid = _decodeUUID(uuid)
return (uuid, state)
......@@ -1108,14 +1300,25 @@ class AddPendingNodes(Packet):
Ask the primary to include some pending node in the partition table
_header_format = '!H'
_list_header_format = '!16s'
_list_header_len = calcsize(_list_header_format)
def _encode(self, uuid_list=()):
list_header_format = self._list_header_format
# an empty list means all current pending nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
return pack('!H', len(uuid_list)) + ''.join(uuid_list)
uuid_list = [pack(list_header_format, _encodeUUID(uuid)) \
for uuid in uuid_list]
return pack(self._header_format, len(uuid_list)) + ''.join(uuid_list)
def _decode(self, body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
header_len = self._header_len
(n, ) = unpack(self._header_format, body[:header_len])
list_header_format = self._list_header_format
list_header_len = self._list_header_len
uuid_list = [unpack(list_header_format,
header_len+(i+1)*list_header_len])[0] for i in xrange(n)]
uuid_list = [_decodeUUID(x) for x in uuid_list]
return (uuid_list, )
......@@ -1123,14 +1326,24 @@ class AnswerNewNodes(Packet):
Answer what are the nodes added in the partition table
_header_format = '!H'
_list_header_format = '!16s'
_list_header_len = calcsize(_list_header_format)
def _encode(self, uuid_list):
list_header_format = self._list_header_format
# an empty list means no new nodes
uuid_list = [pack('!16s', _encodeUUID(uuid)) for uuid in uuid_list]
return pack('!H', len(uuid_list)) + ''.join(uuid_list)
uuid_list = [pack(list_header_format, _encodeUUID(uuid)) for uuid in uuid_list]
return pack(self._header_format, len(uuid_list)) + ''.join(uuid_list)
def _decode(self, body):
(n, ) = unpack('!H', body[:2])
uuid_list = [unpack('!16s', body[2+i*16:18+i*16])[0] for i in xrange(n)]
header_len = self._header_len
(n, ) = unpack(self._header_format, body[:header_len])
list_header_format = self._list_header_format
list_header_len = self._list_header_len
uuid_list = [unpack(list_header_format,
header_len+(i+1)*list_header_len])[0] for i in xrange(n)]
uuid_list = [_decodeUUID(x) for x in uuid_list]
return (uuid_list, )
......@@ -1138,19 +1351,30 @@ class NotifyNodeInformation(Packet):
Notify information about one or more nodes. PM -> Any.
_header_format = '!L'
_list_entry_format = '!H6s16sH'
_list_entry_len = calcsize(_list_entry_format)
def _encode(self, node_list):
body = [pack('!L', len(node_list))]
body = [pack(self._header_format, len(node_list))]
list_entry_format = self._list_entry_format
for node_type, address, uuid, state in node_list:
uuid = _encodeUUID(uuid)
address = _encodeAddress(address)
body.append(pack('!H6s16sH', node_type, address, uuid, state))
body.append(pack(list_entry_format, node_type, address, uuid,
return ''.join(body)
def _decode(self, body):
(n,) = unpack('!L', body[:4])
offset = self._header_len
(n,) = unpack(self._header_format, body[:offset])
node_list = []
list_entry_format = self._list_entry_format
list_entry_len = self._list_entry_len
for i in xrange(n):
r = unpack('!H6s16sH', body[4+i*26:30+i*26])
next_offset = offset + list_entry_len
r = unpack(list_entry_format, body[offset:next_offset])
offset = next_offset
node_type, address, uuid, state = r
address = _decodeAddress(address)
node_type = _decodeNodeType(node_type)
......@@ -1175,11 +1399,13 @@ class SetClusterState(Packet):
Set the cluster state
_header_format = '!H'
def _encode(self, state):
return pack('!H', state)
return pack(self._header_format, state)
def _decode(self, body):
(state, ) = unpack('!H', body[:2])
(state, ) = unpack(self._header_format, body[:self._header_len])
state = _decodeClusterState(state)
return (state, )
......@@ -1187,11 +1413,13 @@ class NotifyClusterInformation(Packet):
Notify information about the cluster
_header_format = '!H'
def _encode(self, state):
return pack('!H', state)
return pack(self._header_format, state)
def _decode(self, body):
(state, ) = unpack('!H', body)
(state, ) = unpack(self._header_format, body)
state = _decodeClusterState(state)
return (state, )
......@@ -1205,11 +1433,13 @@ class AnswerClusterState(Packet):
Answer state of the cluster
_header_format = '!H'
def _encode(self, state):
return pack('!H', state)
return pack(self._header_format, state)
def _decode(self, body):
(state, ) = unpack('!H', body)
(state, ) = unpack(self._header_format, body)
state = _decodeClusterState(state)
return (state, )
......@@ -1227,26 +1457,35 @@ class Error(Packet):
any other message, even if such a message does not expect a reply
usually. Any -> Any.
_header_format = '!H'
def _encode(self, code, message):
return pack('!HL', code, len(message)) + message
return pack(self._header_format, code) + _writeString(message)
def _decode(self, body):
(code, ) = unpack('!H', body[:2])
offset = self._header_len
(code, ) = unpack(self._header_format, body[:offset])
code = _decodeErrorCode(code)
(message, _) = _readString(body, 'message', offset=2)
(message, _) = _readString(body, 'message', offset=offset)
return (code, message)
def initMessage(klass):
if klass._header_format is not None:
klass._header_len = calcsize(klass._header_format)
StaticRegistry = {}
def register(code, request, answer=None):
""" Register a packet in the packet registry """
# register the request
# assert code & RESPONSE_MASK == 0
assert code not in StaticRegistry, "Duplicate request packet code"
request._code = code
request._answer = answer
StaticRegistry[code] = request
if answer not in (None, Error):
# compute the answer code
code = code | RESPONSE_MASK
answer._request = request
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment