Commit ea5f7d61 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/proto: Serialization support

Provide a way for every message to be encoded/decoded to/from NEO wire
encoding. For this introduce Msg interface with wire coding methods and
provide such methods for all message types.

For selected types the methods are implemented manually.
For most of the types the methods are generated automatically by protogen.go program.

protogen.go was mentioned in http://navytux.spb.ru/~kirr/neo.html#development-overview
in "On server-side NEO/go work started by first implementing messages
serialization in exactly the same wire format as NEO/py does ..." paragraph.

A bit of late protogen fixups history:

	lab.nexedi.com/kirr/neo/commit/c884bfd5
	lab.nexedi.com/kirr/neo/commit/385d813a
	lab.nexedi.com/kirr/neo/commit/0f7e0b00
	lab.nexedi.com/kirr/neo/commit/de3ef2c0

Also a message type can be reverse-looked up by message code via MsgType().
This will be later used in network receive code path.
parent fcd6f9f6
......@@ -26,11 +26,20 @@ import (
"fmt"
"math"
"net"
"reflect"
"strconv"
"strings"
"time"
)
// MsgType looks up message type by message code.
//
// Nil is returned if message code is not valid.
func MsgType(msgCode uint16) reflect.Type {
return msgTypeRegistry[msgCode]
}
func (e *Error) Error() string {
// NOTE here, not in proto.go - because else stringer will be confused.
// XXX better translate to some other errors?
......
......@@ -16,7 +16,8 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package proto provides definition of NEO messages.
// Package proto provides definition of NEO messages and their marshalling
// to/from wire format.
//
// Two NEO nodes can exchange messages over underlying network link after
// performing NEO-specific handshake. A message is sent as a packet specifying
......@@ -25,9 +26,12 @@
//
// PktHeader describes packet header structure.
//
// Messages are represented by corresponding types.
// Messages are represented by corresponding types that all implement Msg interface.
//
// The proto packages provides only message definitions.
// A message type can be looked up by message code with MsgType.
//
// The proto packages provides only message definitions and low-level
// primitives for their marshalling.
package proto
// This file defines everything that relates to messages on the wire.
......@@ -38,14 +42,19 @@ package proto
// several messages and does not itself denote a separate message, its
// definition is prefixed with `//neo:proto typeonly` comment.
//
// The order of message definitions is significant - messages will be assigned
// The order of message definitions is significant - messages are assigned
// message codes in the same order they are defined.
//
// For compatibility with neo/py a message will have its code assigned with "answer"
// For compatibility with neo/py a message has its code assigned with "answer"
// bit set if either message name starts with "Answer" or message definition is
// prefixed with `//neo:proto answer` comment.
//
// Packet structure and messages are the same as in neo/py (see protocol.py).
// Packet structure and messages are bit-to-bit compatible with neo/py (see protocol.py).
//
// The code to marshal/unmarshal messages is generated by protogen.go .
//go:generate sh -c "go run protogen.go >zproto-marshal.go"
// TODO regroup messages definitions to stay more close to 1 communication topic
// TODO document protocol itself better (who sends who what with which semantic)
......@@ -60,6 +69,10 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/internal/packed"
"encoding/binary"
"errors"
"math"
)
const (
......@@ -95,6 +108,29 @@ type PktHeader struct {
MsgLen packed.BE32 // payload message length (excluding packet header)
}
// Msg is the interface implemented by all NEO messages.
type Msg interface {
// marshal/unmarshal into/from wire format:
// NEOMsgCode returns message code needed to be used for particular message type
// on the wire.
NEOMsgCode() uint16
// NEOMsgEncodedLen returns how much space is needed to encode current message payload.
NEOMsgEncodedLen() int
// NEOMsgEncode encodes current message state into buf.
//
// len(buf) must be >= neoMsgEncodedLen().
NEOMsgEncode(buf []byte)
// NEOMsgDecode decodes data into message in-place.
NEOMsgDecode(data []byte) (nread int, err error)
}
// ErrDecodeOverflow is the error returned by neoMsgDecode when decoding hits buffer overflow
var ErrDecodeOverflow = errors.New("decode: buffer overflow")
// ---- messages ----
type ErrorCode uint32
......@@ -212,6 +248,42 @@ type Address struct {
Port uint16
}
// NOTE if Host == "" -> Port not added to wire (see py.PAddress):
func (a *Address) neoEncodedLen() int {
l := string_neoEncodedLen(a.Host)
if a.Host != "" {
l += 2
}
return l
}
func (a *Address) neoEncode(b []byte) int {
n := string_neoEncode(a.Host, b[0:])
if a.Host != "" {
binary.BigEndian.PutUint16(b[n:], a.Port)
n += 2
}
return n
}
func (a *Address) neoDecode(b []byte) (uint64, bool) {
n, ok := string_neoDecode(&a.Host, b)
if !ok {
return 0, false
}
if a.Host != "" {
b = b[n:]
if len(b) < 2 {
return 0, false
}
a.Port = binary.BigEndian.Uint16(b)
n += 2
} else {
a.Port = 0
}
return n, true
}
// Checksum is a SHA1 hash.
type Checksum [20]byte
......@@ -223,6 +295,33 @@ type PTid uint64
// IdTime represents time of identification.
type IdTime float64
func (t IdTime) neoEncodedLen() int {
return 8
}
func (t IdTime) neoEncode(b []byte) int {
// use -inf as value for no data (NaN != NaN -> hard to use NaN in tests)
// NOTE neo/py uses None for "no data"; we use 0 for "no data" to avoid pointer
tt := float64(t)
if tt == math.Inf(-1) {
tt = math.NaN()
}
float64_neoEncode(b, tt)
return 8
}
func (t *IdTime) neoDecode(data []byte) (uint64, bool) {
if len(data) < 8 {
return 0, false
}
tt := float64_neoDecode(data)
if math.IsNaN(tt) {
tt = math.Inf(-1)
}
*t = IdTime(tt)
return 8, true
}
// NodeInfo is information about a node.
//
//neo:proto typeonly
......@@ -974,3 +1073,74 @@ type Truncate struct {
// answer = Error
}
// ---- runtime support for protogen and custom codecs ----
// customCodec is the interface that is implemented by types with custom encodings.
//
// its semantic is very similar to Msg.
type customCodec interface {
neoEncodedLen() int
neoEncode(buf []byte) (nwrote int)
neoDecode(data []byte) (nread uint64, ok bool) // XXX uint64 or int here?
}
func byte2bool(b byte) bool {
return b != 0
}
func bool2byte(b bool) byte {
if b {
return 1
} else {
return 0
}
}
// NOTE py.None encodes as '\xff' * 8 (-> we use NaN for None)
// NOTE '\xff' * 8 represents FP NaN but many other NaN bits representations exist
func float64_neoEncode(b []byte, f float64) {
var fu uint64
if !math.IsNaN(f) {
fu = math.Float64bits(f)
} else {
// convert all NaNs to canonical \xff * 8
fu = 1<<64 - 1
}
binary.BigEndian.PutUint64(b, fu)
}
func float64_neoDecode(b []byte) float64 {
fu := binary.BigEndian.Uint64(b)
return math.Float64frombits(fu)
}
// XXX we need string_neo* only for Address
// XXX dup of genSlice1 in protogen.go
func string_neoEncodedLen(s string) int {
return 4 + len(s)
}
func string_neoEncode(s string, data []byte) int {
l := len(s)
binary.BigEndian.PutUint32(data, uint32(l))
copy(data[4:4+l], s) // NOTE [:l] to catch data overflow as copy copies minimal len
return 4 + l
}
func string_neoDecode(sp *string, data []byte) (nread uint64, ok bool) {
if len(data) < 4 {
return 0, false
}
l := binary.BigEndian.Uint32(data)
data = data[4:]
if uint64(len(data)) < uint64(l) {
return 0, false
}
*sp = string(data[:l])
return 4 + uint64(l), true
}
......@@ -18,11 +18,313 @@
// See https://www.nexedi.com/licensing for rationale and options.
package proto
// NEO. protocol encoding tests
import (
"encoding/binary"
hexpkg "encoding/hex"
"fmt"
"reflect"
"runtime"
"strings"
"testing"
"unsafe"
"lab.nexedi.com/kirr/neo/go/zodb"
)
// decode string as hex; panic on error
func hex(s string) string {
b, err := hexpkg.DecodeString(s)
if err != nil {
panic(err)
}
return string(b)
}
// uint16 -> string as encoded on the wire
func u16(v uint16) string {
var b [2]byte
binary.BigEndian.PutUint16(b[:], v)
return string(b[:])
}
// uint32 -> string as encoded on the wire
func u32(v uint32) string {
var b [4]byte
binary.BigEndian.PutUint32(b[:], v)
return string(b[:])
}
// uint64 -> string as encoded on the wire
func u64(v uint64) string {
var b [8]byte
binary.BigEndian.PutUint64(b[:], v)
return string(b[:])
}
func TestPktHeader(t *testing.T) {
// make sure PktHeader is really packed and its size matches PktHeaderLen
if unsafe.Sizeof(PktHeader{}) != 10 {
t.Fatalf("sizeof(PktHeader) = %v ; want 10", unsafe.Sizeof(PktHeader{}))
}
if unsafe.Sizeof(PktHeader{}) != PktHeaderLen {
t.Fatalf("sizeof(PktHeader) = %v ; want %v", unsafe.Sizeof(PktHeader{}), PktHeaderLen)
}
}
// test marshalling for one message type
func testMsgMarshal(t *testing.T, msg Msg, encoded string) {
typ := reflect.TypeOf(msg).Elem() // type of *msg
msg2 := reflect.New(typ).Interface().(Msg)
defer func() {
if e := recover(); e != nil {
t.Errorf("%v: panic ↓↓↓:", typ)
panic(e) // to show traceback
}
}()
// msg.encode() == expected
msgCode := msg.NEOMsgCode()
n := msg.NEOMsgEncodedLen()
msgType := MsgType(msgCode)
if msgType != typ {
t.Errorf("%v: msgCode = %v which corresponds to %v", typ, msgCode, msgType)
}
if n != len(encoded) {
t.Errorf("%v: encodedLen = %v ; want %v", typ, n, len(encoded))
}
buf := make([]byte, n)
msg.NEOMsgEncode(buf)
if string(buf) != encoded {
t.Errorf("%v: encode result unexpected:", typ)
t.Errorf("\thave: %s", hexpkg.EncodeToString(buf))
t.Errorf("\twant: %s", hexpkg.EncodeToString([]byte(encoded)))
}
// encode must panic if passed a smaller buffer
for l := len(buf) - 1; l >= 0; l-- {
func() {
defer func() {
subj := fmt.Sprintf("%v: encode(buf[:encodedLen-%v])", typ, len(encoded)-l)
e := recover()
if e == nil {
t.Errorf("%s did not panic", subj)
return
}
err, ok := e.(runtime.Error)
if !ok {
t.Errorf("%s panic(%#v) ; want runtime.Error", subj, e)
}
estr := err.Error()
if !(strings.Contains(estr, "slice bounds out of range") ||
strings.Contains(estr, "index out of range")) {
t.Errorf("%s unexpected runtime panic: %v", subj, estr)
}
}()
msg.NEOMsgEncode(buf[:l])
}()
}
// msg.decode() == expected
data := []byte(encoded + "noise")
n, err := msg2.NEOMsgDecode(data)
if err != nil {
t.Errorf("%v: decode error %v", typ, err)
}
if n != len(encoded) {
t.Errorf("%v: nread = %v ; want %v", typ, n, len(encoded))
}
if !reflect.DeepEqual(msg2, msg) {
t.Errorf("%v: decode result unexpected: %v ; want %v", typ, msg2, msg)
}
// decode must detect buffer overflow
for l := len(encoded) - 1; l >= 0; l-- {
n, err = msg2.NEOMsgDecode(data[:l])
if !(n == 0 && err == ErrDecodeOverflow) {
t.Errorf("%v: decode overflow not detected on [:%v]", typ, l)
}
}
}
// test encoding/decoding of messages
func TestMsgMarshal(t *testing.T) {
var testv = []struct {
msg Msg
encoded string // []byte
}{
// empty
{&Ping{}, ""},
// uint32, string
{&Error{Code: 0x01020304, Message: "hello"}, "\x01\x02\x03\x04\x00\x00\x00\x05hello"},
// Oid, Tid, bool, Checksum, []byte
{&StoreObject{
Oid: 0x0102030405060708,
Serial: 0x0a0b0c0d0e0f0102,
Compression: false,
Checksum: Checksum{1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20}, // XXX simpler?
Data: []byte("hello world"),
DataSerial: 0x0a0b0c0d0e0f0103,
Tid: 0x0a0b0c0d0e0f0104,
},
hex("01020304050607080a0b0c0d0e0f010200") +
hex("0102030405060708090a0b0c0d0e0f1011121314") +
hex("0000000b") + "hello world" +
hex("0a0b0c0d0e0f01030a0b0c0d0e0f0104")},
// PTid, [] (of [] of {UUID, CellState})
{&AnswerPartitionTable{
PTid: 0x0102030405060708,
RowList: []RowInfo{
{1, []CellInfo{{11, UP_TO_DATE}, {17, OUT_OF_DATE}}},
{2, []CellInfo{{11, FEEDING}}},
{7, []CellInfo{{11, CORRUPTED}, {15, DISCARDED}, {23, UP_TO_DATE}}},
},
},
hex("0102030405060708") +
hex("00000003") +
hex("00000001000000020000000b000000000000001100000001") +
hex("00000002000000010000000b00000002") +
hex("00000007000000030000000b000000040000000f000000030000001700000000"),
},
// map[Oid]struct {Tid,Tid,bool}
{&AnswerObjectUndoSerial{
ObjectTIDDict: map[zodb.Oid]struct {
CurrentSerial zodb.Tid
UndoSerial zodb.Tid
IsCurrent bool
}{
1: {1, 0, false},
2: {7, 1, true},
8: {7, 1, false},
5: {4, 3, true},
}},
u32(4) +
u64(1) + u64(1) + u64(0) + hex("00") +
u64(2) + u64(7) + u64(1) + hex("01") +
u64(5) + u64(4) + u64(3) + hex("01") +
u64(8) + u64(7) + u64(1) + hex("00"),
},
// map[uint32]UUID + trailing ...
{&CheckReplicas{
PartitionDict: map[uint32]NodeUUID{
1: 7,
2: 9,
7: 3,
4: 17,
},
MinTID: 23,
MaxTID: 128,
},
u32(4) +
u32(1) + u32(7) +
u32(2) + u32(9) +
u32(4) + u32(17) +
u32(7) + u32(3) +
u64(23) + u64(128),
},
// uint32, []uint32
{&PartitionCorrupted{7, []NodeUUID{1, 3, 9, 4}},
u32(7) + u32(4) + u32(1) + u32(3) + u32(9) + u32(4),
},
// uint32, Address, string, IdTime
{&RequestIdentification{CLIENT, 17, Address{"localhost", 7777}, "myname", 0.12345678},
u32(2) + u32(17) + u32(9) +
"localhost" + u16(7777) +
u32(6) + "myname" +
hex("3fbf9add1091c895"),
},
// IdTime, empty Address, int32
{&NotifyNodeInformation{1504466245.926185, []NodeInfo{
{CLIENT, Address{}, UUID(CLIENT, 1), RUNNING, 1504466245.925599}}},
hex("41d66b15517b469d") + u32(1) +
u32(2) + u32(0) /* <- ø Address */ + hex("e0000001") + u32(2) +
hex("41d66b15517b3d04"),
},
// empty IdTime
{&NotifyNodeInformation{IdTimeNone, []NodeInfo{}}, hex("ffffffffffffffff") + hex("00000000")},
// TODO we need tests for:
// []varsize + trailing
// map[]varsize + trailing
}
for _, tt := range testv {
testMsgMarshal(t, tt.msg, tt.encoded)
}
}
// For all message types: same as testMsgMarshal but zero-values only.
// this way we additionally lightly check encode / decode overflow behaviour for all types.
func TestMsgMarshalAllOverflowLightly(t *testing.T) {
for _, typ := range msgTypeRegistry {
// zero-value for a type
msg := reflect.New(typ).Interface().(Msg)
l := msg.NEOMsgEncodedLen()
zerol := make([]byte, l)
// decoding will turn nil slice & map into empty allocated ones.
// we need it so that reflect.DeepEqual works for msg encode/decode comparison
n, err := msg.NEOMsgDecode(zerol)
if !(n == l && err == nil) {
t.Errorf("%v: zero-decode unexpected: %v, %v ; want %v, nil", typ, n, err, l)
}
testMsgMarshal(t, msg, string(zerol))
}
}
// Verify overflow handling on decode len checks
func TestMsgDecodeLenOverflow(t *testing.T) {
var testv = []struct {
msg Msg // of type to decode into
data string // []byte - tricky data to exercise decoder u32 len checks overflow
}{
// [] with sizeof(item) = 8 -> len*sizeof(item) = 0 if u32
{&AnswerTIDs{}, u32(0x20000000)},
// {} with sizeof(key) = 8, sizeof(value) = 8 -> len*sizeof(key+value) = 0 if u32
{&AnswerLockedTransactions{}, u32(0x10000000)},
}
for _, tt := range testv {
data := []byte(tt.data)
func() {
defer func() {
if e := recover(); e != nil {
t.Errorf("%T: decode: panic on %x", tt.msg, data)
}
}()
n, err := tt.msg.NEOMsgDecode(data)
if !(n == 0 && err == ErrDecodeOverflow) {
t.Errorf("%T: decode %x\nhave: %d, %v\nwant: %d, %v", tt.msg, data,
n, err, 0, ErrDecodeOverflow)
}
}()
}
}
func TestUUID(t *testing.T) {
var testv = []struct{typ NodeType; num int32; uuid uint32; str string}{
{STORAGE, 1, 0x00000001, "S1"},
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment