Commit e8143557 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/proto: MessagePack support for messages (draft)

This patch adds proto.Encoding('M') and teaches it to encode/decode
messages via MessagePack by the rules defined in

	nexedi/neoppod@9d0bf97a
	( nexedi/neoppod!11 )

It only adds support for messages serialization, without changing
proto.go to match changes in e.g. enums reordering, and without adding
support for MessagePack at link layer in neonet.

M-encoding was only tested for NEO/go-NEO/go, and was not yet tested for
NEO/go-NEO/py interoperation. There will be likely small mistakes
discovered on my side that should be hopefully easy to fix step by step
once we get to that phase.
parent a8249bd9
// Copyright (C) 2020-2021 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package proto
// runtime glue for msgpack support
import (
"fmt"
"github.com/tinylib/msgp/msgp"
"lab.nexedi.com/kirr/neo/go/neo/internal/msgpack"
)
// mstructDecodeError represents decode error when decoder was expecting
// tuple<nfield> for structure named path.
type mstructDecodeError struct {
path string // "Type.field.field"
op msgpack.Op // op we got
opOk msgpack.Op // op expected
}
func (e *mstructDecodeError) Error() string {
return fmt.Sprintf("decode: M: struct %s: got opcode %02x; expect %02x", e.path, e.op, e.opOk)
}
// mdecodeErr is called to normilize error when msgp.ReadXXX returns err when decoding path.
func mdecodeErr(path string, err error) error {
if err == msgp.ErrShortBytes {
return ErrDecodeOverflow
}
return &mdecodeError{path, err}
}
type mdecodeError struct {
path string // "Type.field.field"
err error
}
func (e *mdecodeError) Error() string {
return fmt.Sprintf("decode: M: %s: %s", e.path, e.err)
}
// mOpError represents decode error when decoder faces unexpected operation.
type mOpError struct {
op msgpack.Op // op we got
expectedv []msgpack.Op // expected was any op from expectedv
}
func (e *mOpError) Error() string {
return fmt.Sprintf("got opcode %02x; expected any from %02x", e.op, e.expectedv)
}
func mdecodeOpErr(path string, op msgpack.Op, expectedv ...msgpack.Op) error {
return mdecodeErr(path+"/op", &mOpError{op, expectedv})
}
// mLen8Error represents decode error when decoder faces unexpected length in Bin8.
type mLen8Error struct {
l, lOk byte // len we got and expected
}
func (e *mLen8Error) Error() string {
return fmt.Sprintf("expected length %d; got %d", e.lOk, e.l)
}
func mdecodeLen8Err(path string, l, lOk uint8) error {
return mdecodeErr(path+"/len", &mLen8Error{l, lOk})
}
func mdecodeEnumTypeErr(path string, enumType, enumTypeOk byte) error {
return mdecodeErr(path+"/enumType",
fmt.Errorf("expected %d; got %d", enumTypeOk, enumType))
}
func mdecodeEnumValueErr(path string, v byte) error {
return mdecodeErr(path, fmt.Errorf("invalid enum payload %02x", v))
}
...@@ -129,6 +129,10 @@ type Msg interface { ...@@ -129,6 +129,10 @@ type Msg interface {
neoMsgEncodeN(buf []byte) neoMsgEncodeN(buf []byte)
neoMsgDecodeN(data []byte) (nread int, err error) neoMsgDecodeN(data []byte) (nread int, err error)
// M encoding (via MessagePack)
neoMsgEncodedLenM() int
neoMsgEncodeM(buf []byte)
neoMsgDecodeM(data []byte) (nread int, err error)
} }
// Encoding represents messages encoding. // Encoding represents messages encoding.
...@@ -139,6 +143,7 @@ func (e Encoding) MsgEncodedLen(msg Msg) int { ...@@ -139,6 +143,7 @@ func (e Encoding) MsgEncodedLen(msg Msg) int {
switch e { switch e {
default: panic("bug") default: panic("bug")
case 'N': return msg.neoMsgEncodedLenN() case 'N': return msg.neoMsgEncodedLenN()
case 'M': return msg.neoMsgEncodedLenM()
} }
} }
...@@ -149,6 +154,7 @@ func (e Encoding) MsgEncode(msg Msg, buf []byte) { ...@@ -149,6 +154,7 @@ func (e Encoding) MsgEncode(msg Msg, buf []byte) {
switch e { switch e {
default: panic("bug") default: panic("bug")
case 'N': msg.neoMsgEncodeN(buf) case 'N': msg.neoMsgEncodeN(buf)
case 'M': msg.neoMsgEncodeM(buf)
} }
} }
...@@ -157,6 +163,7 @@ func (e Encoding) MsgDecode(msg Msg, data []byte) (nread int, err error) { ...@@ -157,6 +163,7 @@ func (e Encoding) MsgDecode(msg Msg, data []byte) (nread int, err error) {
switch e { switch e {
default: panic("bug") default: panic("bug")
case 'N': return msg.neoMsgDecodeN(data) case 'N': return msg.neoMsgDecodeN(data)
case 'M': return msg.neoMsgDecodeM(data)
} }
} }
...@@ -166,6 +173,7 @@ var ErrDecodeOverflow = errors.New("decode: buffer overflow") ...@@ -166,6 +173,7 @@ var ErrDecodeOverflow = errors.New("decode: buffer overflow")
// ---- messages ---- // ---- messages ----
//neo:proto enum
type ErrorCode uint32 type ErrorCode uint32
const ( const (
ACK ErrorCode = iota ACK ErrorCode = iota
...@@ -183,6 +191,7 @@ const ( ...@@ -183,6 +191,7 @@ const (
INCOMPLETE_TRANSACTION INCOMPLETE_TRANSACTION
) )
//neo:proto enum
type ClusterState int8 type ClusterState int8
const ( const (
// The cluster is initially in the RECOVERING state, and it goes back to // The cluster is initially in the RECOVERING state, and it goes back to
...@@ -216,6 +225,7 @@ const ( ...@@ -216,6 +225,7 @@ const (
STOPPING_BACKUP STOPPING_BACKUP
) )
//neo:proto enum
type NodeType int8 type NodeType int8
const ( const (
MASTER NodeType = iota MASTER NodeType = iota
...@@ -224,6 +234,7 @@ const ( ...@@ -224,6 +234,7 @@ const (
ADMIN ADMIN
) )
//neo:proto enum
type NodeState int8 type NodeState int8
const ( const (
UNKNOWN NodeState = iota //short: U // XXX tag prefix name ? UNKNOWN NodeState = iota //short: U // XXX tag prefix name ?
...@@ -232,6 +243,7 @@ const ( ...@@ -232,6 +243,7 @@ const (
PENDING //short: P PENDING //short: P
) )
//neo:proto enum
type CellState int8 type CellState int8
const ( const (
// Write-only cell. Last transactions are missing because storage is/was down // Write-only cell. Last transactions are missing because storage is/was down
......
...@@ -164,12 +164,19 @@ func TestMsgMarshal(t *testing.T) { ...@@ -164,12 +164,19 @@ func TestMsgMarshal(t *testing.T) {
var testv = []struct { var testv = []struct {
msg Msg msg Msg
encodedN string // []byte encodedN string // []byte
encodedM string // []byte
}{ }{
// empty // empty
{&Ping{}, ""}, {&Ping{},
"",
"\x90",
},
// uint32, string // uint32(N)/enum(M), string
{&Error{Code: 0x01020304, Message: "hello"}, "\x01\x02\x03\x04\x00\x00\x00\x05hello"}, {&Error{Code: 0x00000045, Message: "hello"},
"\x00\x00\x00\x45\x00\x00\x00\x05hello",
hex("92") + hex("d40045") + "\xc4\x05hello",
},
// Oid, Tid, bool, Checksum, []byte // Oid, Tid, bool, Checksum, []byte
{&StoreObject{ {&StoreObject{
...@@ -185,7 +192,18 @@ func TestMsgMarshal(t *testing.T) { ...@@ -185,7 +192,18 @@ func TestMsgMarshal(t *testing.T) {
hex("01020304050607080a0b0c0d0e0f010200") + hex("01020304050607080a0b0c0d0e0f010200") +
hex("0102030405060708090a0b0c0d0e0f1011121314") + hex("0102030405060708090a0b0c0d0e0f1011121314") +
hex("0000000b") + "hello world" + hex("0000000b") + "hello world" +
hex("0a0b0c0d0e0f01030a0b0c0d0e0f0104")}, hex("0a0b0c0d0e0f01030a0b0c0d0e0f0104"),
// M
hex("97") +
hex("c408") + hex("0102030405060708") +
hex("c408") + hex("0a0b0c0d0e0f0102") +
hex("c2") +
hex("c414") + hex("0102030405060708090a0b0c0d0e0f1011121314") +
hex("c40b") + "hello world" +
hex("c408") + hex("0a0b0c0d0e0f0103") +
hex("c408") + hex("0a0b0c0d0e0f0104"),
},
// PTid, [] (of [] of {UUID, CellState}) // PTid, [] (of [] of {UUID, CellState})
{&AnswerPartitionTable{ {&AnswerPartitionTable{
...@@ -205,6 +223,15 @@ func TestMsgMarshal(t *testing.T) { ...@@ -205,6 +223,15 @@ func TestMsgMarshal(t *testing.T) {
hex("000000020000000b010000001100") + hex("000000020000000b010000001100") +
hex("000000010000000b02") + hex("000000010000000b02") +
hex("000000030000000b030000000f040000001701"), hex("000000030000000b030000000f040000001701"),
// M
hex("93") +
hex("cf0102030405060708") +
hex("22") +
hex("93") +
hex("91"+"92"+"920bd40401"+"9211d40400") +
hex("91"+"91"+"920bd40402") +
hex("91"+"93"+"920bd40403"+"920fd40404"+"9217d40401"),
}, },
// map[Oid]struct {Tid,Tid,bool} // map[Oid]struct {Tid,Tid,bool}
...@@ -226,6 +253,14 @@ func TestMsgMarshal(t *testing.T) { ...@@ -226,6 +253,14 @@ func TestMsgMarshal(t *testing.T) {
u64(2) + u64(7) + u64(1) + hex("01") + u64(2) + u64(7) + u64(1) + hex("01") +
u64(5) + u64(4) + u64(3) + hex("01") + u64(5) + u64(4) + u64(3) + hex("01") +
u64(8) + u64(7) + u64(1) + hex("00"), u64(8) + u64(7) + u64(1) + hex("00"),
// M
hex("91") +
hex("84") +
hex("c408")+u64(1) + hex("93") + hex("c408")+u64(1) + hex("c408")+u64(0) + hex("c2") +
hex("c408")+u64(2) + hex("93") + hex("c408")+u64(7) + hex("c408")+u64(1) + hex("c3") +
hex("c408")+u64(5) + hex("93") + hex("c408")+u64(4) + hex("c408")+u64(3) + hex("c3") +
hex("c408")+u64(8) + hex("93") + hex("c408")+u64(7) + hex("c408")+u64(1) + hex("c2"),
}, },
// map[uint32]UUID + trailing ... // map[uint32]UUID + trailing ...
...@@ -247,12 +282,28 @@ func TestMsgMarshal(t *testing.T) { ...@@ -247,12 +282,28 @@ func TestMsgMarshal(t *testing.T) {
u32(4) + u32(17) + u32(4) + u32(17) +
u32(7) + u32(3) + u32(7) + u32(3) +
u64(23) + u64(128), u64(23) + u64(128),
// M
hex("93") +
hex("84") +
hex("01" + "07") +
hex("02" + "09") +
hex("04" + "11") +
hex("07" + "03") +
hex("c408") + u64(23) +
hex("c408") + u64(128),
}, },
// uint32, []uint32 // uint32, []uint32
{&PartitionCorrupted{7, []NodeUUID{1, 3, 9, 4}}, {&PartitionCorrupted{7, []NodeUUID{1, 3, 9, 4}},
// N // N
u32(7) + u32(4) + u32(1) + u32(3) + u32(9) + u32(4), u32(7) + u32(4) + u32(1) + u32(3) + u32(9) + u32(4),
// M
hex("92") +
hex("07") +
hex("94") +
hex("01030904"),
}, },
// uint32, Address, string, IdTime // uint32, Address, string, IdTime
...@@ -264,6 +315,16 @@ func TestMsgMarshal(t *testing.T) { ...@@ -264,6 +315,16 @@ func TestMsgMarshal(t *testing.T) {
hex("3fbf9add1091c895") + hex("3fbf9add1091c895") +
u32(2) + u32(5)+"room1" + u32(7)+"rack234" + u32(2) + u32(5)+"room1" + u32(7)+"rack234" +
u32(3) + u32(3)+u32(4)+u32(5), u32(3) + u32(3)+u32(4)+u32(5),
// M
hex("97") +
hex("d40202") +
hex("11") +
hex("92") + hex("c409")+"localhost" + hex("cd")+u16(7777) +
hex("c406")+"myname" +
hex("cb" + "3fbf9add1091c895") +
hex("92") + hex("c405")+"room1" + hex("c407")+"rack234" +
hex("93") + hex("030405"),
}, },
// IdTime, empty Address, int32 // IdTime, empty Address, int32
...@@ -273,12 +334,26 @@ func TestMsgMarshal(t *testing.T) { ...@@ -273,12 +334,26 @@ func TestMsgMarshal(t *testing.T) {
hex("41d66b15517b469d") + u32(1) + hex("41d66b15517b469d") + u32(1) +
u8(2) + u32(0) /* <- ø Address */ + hex("e0000001") + u8(2) + u8(2) + u32(0) /* <- ø Address */ + hex("e0000001") + u8(2) +
hex("41d66b15517b3d04"), hex("41d66b15517b3d04"),
// M
hex("92") +
hex("cb" + "41d66b15517b469d") +
hex("91") +
hex("95") +
hex("d40202") +
hex("92" + "c400"+"" + "00") +
hex("d2" + "e0000001") +
hex("d40302") +
hex("cb" + "41d66b15517b3d04"),
}, },
// empty IdTime // empty IdTime
{&NotifyNodeInformation{IdTimeNone, []NodeInfo{}}, {&NotifyNodeInformation{IdTimeNone, []NodeInfo{}},
// N // N
hex("ffffffffffffffff") + hex("00000000"), hex("ffffffffffffffff") + hex("00000000"),
// M
hex("92") +
hex("cb" + "fff0000000000000") + // XXX nan/-inf not handled yet
hex("90"),
}, },
// TODO we need tests for: // TODO we need tests for:
...@@ -288,6 +363,7 @@ func TestMsgMarshal(t *testing.T) { ...@@ -288,6 +363,7 @@ func TestMsgMarshal(t *testing.T) {
for _, tt := range testv { for _, tt := range testv {
testMsgMarshal(t, 'N', tt.msg, tt.encodedN) testMsgMarshal(t, 'N', tt.msg, tt.encodedN)
testMsgMarshal(t, 'M', tt.msg, tt.encodedM)
} }
} }
...@@ -295,11 +371,14 @@ func TestMsgMarshal(t *testing.T) { ...@@ -295,11 +371,14 @@ func TestMsgMarshal(t *testing.T) {
// this way we additionally lightly check encode / decode overflow behaviour for all types. // this way we additionally lightly check encode / decode overflow behaviour for all types.
func TestMsgMarshalAllOverflowLightly(t *testing.T) { func TestMsgMarshalAllOverflowLightly(t *testing.T) {
for _, typ := range msgTypeRegistry { for _, typ := range msgTypeRegistry {
for _, enc := range []Encoding{'N'} { for _, enc := range []Encoding{'N', 'M'} {
// zero-value for a type // zero-value for a type
msg := reflect.New(typ).Interface().(Msg) msg := reflect.New(typ).Interface().(Msg)
l := enc.MsgEncodedLen(msg) l := enc.MsgEncodedLen(msg)
zerol := make([]byte, l) zerol := make([]byte, l)
if enc != 'N' { // M-encoding of zero-value is not all zeros
enc.MsgEncode(msg, zerol)
}
// decoding will turn nil slice & map into empty allocated ones. // decoding will turn nil slice & map into empty allocated ones.
// we need it so that reflect.DeepEqual works for msg encode/decode comparison // we need it so that reflect.DeepEqual works for msg encode/decode comparison
n, err := enc.MsgDecode(msg, zerol) n, err := enc.MsgDecode(msg, zerol)
...@@ -313,6 +392,7 @@ func TestMsgMarshalAllOverflowLightly(t *testing.T) { ...@@ -313,6 +392,7 @@ func TestMsgMarshalAllOverflowLightly(t *testing.T) {
} }
// Verify overflow handling on decodeN len checks // Verify overflow handling on decodeN len checks
// TODO + M-variants with big len too?
func TestMsgDecodeLenOverflowN(t *testing.T) { func TestMsgDecodeLenOverflowN(t *testing.T) {
enc := Encoding('N') enc := Encoding('N')
......
This diff is collapsed.
This diff is collapsed.
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment