Commit 63d322b9 authored by Kirill Smelkov's avatar Kirill Smelkov

go/zodb/zeo: MsgPack support

ZEO5 adds way for messages to be encoded via either pickles or MessagePack.
However until now we were always using pickles.
Let's add msgpack support to be able to e.g. use wire encoding that
server prefers.

MsgPack support is almost fully localized in encoding.
We use tinylib/msgp runtime routines to decode/encode msg fields with known types,
and shamaton/msgpack to decode/encode msg.arg, which is interface{},
because msgp does not generally work for arbitrary reflections.

For msgpack=true, tests state is the same as with pickles: handshake
works, but load fails when verifying that Load returns correct error for
deleted object:

    TestLoad/py/msgpack=false: xtesting.go:272: load 0285cbacc06d3a4c:0000000000000007: returned err unexpected:
        have: /tmp/zeo170183943/1.fs.zeosock: load 0285cbacc06d3a4c:0000000000000007: 0000000000000007: no such object
        want: /tmp/zeo170183943/1.fs.zeosock: load 0285cbacc06d3a4c:0000000000000007: 0000000000000007: object was deleted @0285cbacc06d3a4c
    TestLoad/py/msgpack=false: xtesting.go:272: load 0285cbad858bf2e6:0000000000000006: returned err unexpected:
        have: /tmp/zeo170183943/1.fs.zeosock: load 0285cbad858bf2e6:0000000000000006: 0000000000000006: no such object
        want: /tmp/zeo170183943/1.fs.zeosock: load 0285cbad858bf2e6:0000000000000006: 0000000000000006: object was deleted @0285cbad858bf2e6
    TestLoad/py/msgpack=false: xtesting.go:290: load 7fffffffffffffff:0000000000000007: returned err unexpected:
        have: /tmp/zeo170183943/1.fs.zeosock: load 7fffffffffffffff:0000000000000007: 0000000000000007: no such object
        want: /tmp/zeo170183943/1.fs.zeosock: load 7fffffffffffffff:0000000000000007: 0000000000000007: object was deleted @0285cbacc06d3a4c
    TestLoad/py/msgpack=false: xtesting.go:290: load 7fffffffffffffff:0000000000000006: returned err unexpected:
        have: /tmp/zeo170183943/1.fs.zeosock: load 7fffffffffffffff:0000000000000006: 0000000000000006: no such object
        want: /tmp/zeo170183943/1.fs.zeosock: load 7fffffffffffffff:0000000000000006: 0000000000000006: object was deleted @0285cbad858bf2e6

    TestLoad/py/msgpack=true: xtesting.go:272: load 0285cbacc06d3a4c:0000000000000007: returned err unexpected:
        have: /tmp/zeo247652538/1.fs.zeosock: load 0285cbacc06d3a4c:0000000000000007: 0000000000000007: no such object
        want: /tmp/zeo247652538/1.fs.zeosock: load 0285cbacc06d3a4c:0000000000000007: 0000000000000007: object was deleted @0285cbacc06d3a4c
    TestLoad/py/msgpack=true: xtesting.go:272: load 0285cbad858bf2e6:0000000000000006: returned err unexpected:
        have: /tmp/zeo247652538/1.fs.zeosock: load 0285cbad858bf2e6:0000000000000006: 0000000000000006: no such object
        want: /tmp/zeo247652538/1.fs.zeosock: load 0285cbad858bf2e6:0000000000000006: 0000000000000006: object was deleted @0285cbad858bf2e6
    TestLoad/py/msgpack=true: xtesting.go:290: load 7fffffffffffffff:0000000000000007: returned err unexpected:
        have: /tmp/zeo247652538/1.fs.zeosock: load 7fffffffffffffff:0000000000000007: 0000000000000007: no such object
        want: /tmp/zeo247652538/1.fs.zeosock: load 7fffffffffffffff:0000000000000007: 0000000000000007: object was deleted @0285cbacc06d3a4c
    TestLoad/py/msgpack=true: xtesting.go:290: load 7fffffffffffffff:0000000000000006: returned err unexpected:
        have: /tmp/zeo247652538/1.fs.zeosock: load 7fffffffffffffff:0000000000000006: 0000000000000006: no such object
        want: /tmp/zeo247652538/1.fs.zeosock: load 7fffffffffffffff:0000000000000006: 0000000000000006: object was deleted @0285cbad858bf2e6

This is due to https://github.com/zopefoundation/ZODB/issues/318
parent f7543195
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
package zeo package zeo
// Protocol for exchanged ZEO messages. // Protocol for exchanged ZEO messages.
// On the wire messages are encoded via pickles. // On the wire messages are encoded via either pickles or msgpack.
// Each message is wrapped into packet with be32 header of whole packet size. // Each message is wrapped into packet with be32 header of whole packet size.
// See https://github.com/zopefoundation/ZEO/blob/5.2.1-20-gcb26281d/doc/protocol.rst for details. // See https://github.com/zopefoundation/ZEO/blob/5.2.1-20-gcb26281d/doc/protocol.rst for details.
...@@ -28,6 +28,8 @@ import ( ...@@ -28,6 +28,8 @@ import (
"encoding/binary" "encoding/binary"
"fmt" "fmt"
msgp "github.com/tinylib/msgp/msgp"
msgpack "github.com/shamaton/msgpack"
pickle "github.com/kisielk/og-rek" pickle "github.com/kisielk/og-rek"
"lab.nexedi.com/kirr/go123/mem" "lab.nexedi.com/kirr/go123/mem"
...@@ -56,7 +58,7 @@ const ( ...@@ -56,7 +58,7 @@ const (
) )
// encoding represents messages encoding. // encoding represents messages encoding.
type encoding byte // Z - pickles type encoding byte // Z - pickles, M - msgpack
// ---- message encode/decode ↔ packet ---- // ---- message encode/decode ↔ packet ----
...@@ -64,6 +66,7 @@ type encoding byte // Z - pickles ...@@ -64,6 +66,7 @@ type encoding byte // Z - pickles
func (e encoding) pktEncode(m msg) *pktBuf { func (e encoding) pktEncode(m msg) *pktBuf {
switch e { switch e {
case 'Z': return pktEncodeZ(m) case 'Z': return pktEncodeZ(m)
case 'M': return pktEncodeM(m)
default: panic("bug") default: panic("bug")
} }
} }
...@@ -72,6 +75,7 @@ func (e encoding) pktEncode(m msg) *pktBuf { ...@@ -72,6 +75,7 @@ func (e encoding) pktEncode(m msg) *pktBuf {
func (e encoding) pktDecode(pkb *pktBuf) (msg, error) { func (e encoding) pktDecode(pkb *pktBuf) (msg, error) {
switch e { switch e {
case 'Z': return pktDecodeZ(pkb) case 'Z': return pktDecodeZ(pkb)
case 'M': return pktDecodeM(pkb)
default: panic("bug") default: panic("bug")
} }
} }
...@@ -88,6 +92,29 @@ func pktEncodeZ(m msg) *pktBuf { ...@@ -88,6 +92,29 @@ func pktEncodeZ(m msg) *pktBuf {
return pkb return pkb
} }
// pktEncodeM encodes message into raw M (msgpack) packet.
func pktEncodeM(m msg) *pktBuf {
pkb := allocPkb()
data := pkb.data
data = msgp.AppendArrayHeader(data, 4)
data = msgp.AppendInt64(data, m.msgid) // msgid
data = msgp.AppendInt64(data, int64(m.flags)) // flags
data = msgp.AppendString(data, m.method) // method
// arg
// it is interface{} - use shamaton/msgpack since msgp does not handle
// arbitrary interfaces well.
dataArg, err := msgpack.Encode(m.arg)
if err != nil {
panic(err) // all our types are expected to be supported by msgpack
}
data = append(data, dataArg...)
pkb.data = data
return pkb
}
// pktDecodeZ decodes raw Z (pickle) packet into message. // pktDecodeZ decodes raw Z (pickle) packet into message.
func pktDecodeZ(pkb *pktBuf) (msg, error) { func pktDecodeZ(pkb *pktBuf) (msg, error) {
var m msg var m msg
...@@ -133,6 +160,94 @@ func pktDecodeZ(pkb *pktBuf) (msg, error) { ...@@ -133,6 +160,94 @@ func pktDecodeZ(pkb *pktBuf) (msg, error) {
return m, nil return m, nil
} }
// pktDecodeM decodes raw M (msgpack) packet into message.
func pktDecodeM(pkb *pktBuf) (msg, error) {
var m msg
b := pkb.Payload()
// must be (msgid, False|0, "method", arg)
l, b, err := msgp.ReadArrayHeaderBytes(b)
if err != nil {
return m, derrf("%s", err)
}
if l != 4 {
return m, derrf("len(msg-tuple)=%d; expected 4", l)
}
// msgid
v := int64(0)
switch t := msgp.NextType(b); t {
case msgp.IntType:
v, b, err = msgp.ReadInt64Bytes(b)
case msgp.UintType:
var x uint64
x, b, err = msgp.ReadUint64Bytes(b)
v = int64(x)
default:
err = fmt.Errorf("got %s; expected int", t)
}
if err != nil {
return m, derrf("msgid: %s", err)
}
m.msgid = v
// flags
v = int64(0)
switch t := msgp.NextType(b); t {
case msgp.BoolType:
var x bool
x, b, err = msgp.ReadBoolBytes(b)
if x { v = 1 }
case msgp.IntType:
v, b, err = msgp.ReadInt64Bytes(b)
case msgp.UintType:
var x uint64
x, b, err = msgp.ReadUint64Bytes(b)
v = int64(x)
default:
err = fmt.Errorf("got %s; expected int|bool", t)
}
if err != nil {
return m, derrf("flags: %s", err)
}
// XXX check flags are in range?
m.flags = msgFlags(v)
// method
s := ""
switch t := msgp.NextType(b); t {
case msgp.StrType:
s, b, err = msgp.ReadStringBytes(b)
case msgp.BinType:
var x []byte
x, b, err = msgp.ReadBytesZC(b)
s = string(x)
default:
err = fmt.Errorf("got %s; expected str|bin", t)
}
if err != nil {
return m, derrf(".%d: method: %s", m.msgid, err)
}
m.method = s
// arg
// it is interface{} - use shamaton/msgpack since msgp does not handle
// arbitrary interfaces well.
btail, err := msgp.Skip(b)
if err != nil {
return m, derrf(".%d: arg: %s", m.msgid, err)
}
if len(btail) != 0 {
return m, derrf(".%d: payload has extra data after message")
}
err = msgpack.Decode(b, &m.arg)
if err != nil {
return m, derrf(".%d: arg: %s", m.msgid, err)
}
return m, nil
}
func derrf(format string, argv ...interface{}) error { func derrf(format string, argv ...interface{}) error {
return fmt.Errorf("decode: "+format, argv...) return fmt.Errorf("decode: "+format, argv...)
...@@ -145,7 +260,7 @@ func derrf(format string, argv ...interface{}) error { ...@@ -145,7 +260,7 @@ func derrf(format string, argv ...interface{}) error {
type tuple []interface{} type tuple []interface{}
// Tuple converts t into corresponding object appropriate for encoding e. // Tuple converts t into corresponding object appropriate for encoding e.
func (e encoding) Tuple(t tuple) pickle.Tuple { func (e encoding) Tuple(t tuple) interface{} {
switch e { switch e {
default: default:
panic("bug") panic("bug")
...@@ -153,6 +268,15 @@ func (e encoding) Tuple(t tuple) pickle.Tuple { ...@@ -153,6 +268,15 @@ func (e encoding) Tuple(t tuple) pickle.Tuple {
case 'Z': case 'Z':
// pickle: -> pickle.Tuple // pickle: -> pickle.Tuple
return pickle.Tuple(t) return pickle.Tuple(t)
case 'M':
// msgpack: -> leave as tuple
// However shamaton/msgpack encodes tuple(nil) as nil, not empty tuple,
// so nil -> tuple{}.
if t == nil {
t = tuple{}
}
return t
} }
} }
...@@ -172,6 +296,11 @@ func (e encoding) asTuple(xt interface{}) (tuple, bool) { ...@@ -172,6 +296,11 @@ func (e encoding) asTuple(xt interface{}) (tuple, bool) {
default: default:
return tuple(nil), false return tuple(nil), false
} }
case 'M':
// msgpack: tuples/lists are encoded as arrays; decoded as []interface{}
t, ok := xt.([]interface{})
return tuple(t), ok
} }
} }
...@@ -189,11 +318,24 @@ func (e encoding) xuint64Unpack(xv interface{}) (uint64, bool) { ...@@ -189,11 +318,24 @@ func (e encoding) xuint64Unpack(xv interface{}) (uint64, bool) {
return 0, false return 0, false
} }
return v, true return v, true
case 'M':
// msgpack decodes bytes as []byte (which corresponds to bytearray in pickle)
switch v := xv.(type) {
default:
return 0, false
case []byte:
if len(v) != 8 {
return 0, false
}
return binary.BigEndian.Uint64(v), true
}
} }
} }
// xuint64Pack packs v into big-endian 8-byte string // xuint64Pack packs v into big-endian 8-byte string
func (e encoding) xuint64Pack(v uint64) string { func (e encoding) xuint64Pack(v uint64) interface{} {
var b [8]byte var b [8]byte
binary.BigEndian.PutUint64(b[:], v) binary.BigEndian.PutUint64(b[:], v)
...@@ -204,16 +346,20 @@ func (e encoding) xuint64Pack(v uint64) string { ...@@ -204,16 +346,20 @@ func (e encoding) xuint64Pack(v uint64) string {
case 'Z': case 'Z':
// pickle: -> str XXX do we need to emit bytes instead of str? // pickle: -> str XXX do we need to emit bytes instead of str?
return mem.String(b[:]) return mem.String(b[:])
case 'M':
// msgpack: -> bin
return b[:]
} }
} }
// Tid converts tid into corresponding object appropriate for encoding e. // Tid converts tid into corresponding object appropriate for encoding e.
func (e encoding) Tid(tid zodb.Tid) string { func (e encoding) Tid(tid zodb.Tid) interface{} {
return e.xuint64Pack(uint64(tid)) return e.xuint64Pack(uint64(tid))
} }
// Oid converts oid into corresponding object appropriate for encoding e. // Oid converts oid into corresponding object appropriate for encoding e.
func (e encoding) Oid(oid zodb.Oid) string { func (e encoding) Oid(oid zodb.Oid) interface{} {
return e.xuint64Pack(uint64(oid)) return e.xuint64Pack(uint64(oid))
} }
...@@ -243,6 +389,11 @@ func (e encoding) asBytes(xb interface{}) ([]byte, bool) { ...@@ -243,6 +389,11 @@ func (e encoding) asBytes(xb interface{}) ([]byte, bool) {
return nil, false return nil, false
} }
return mem.Bytes(s), true return mem.Bytes(s), true
case 'M':
// msgpack: bin
b, ok := xb.([]byte)
return b, ok
} }
} }
...@@ -256,5 +407,16 @@ func (e encoding) asString(xs interface{}) (string, bool) { ...@@ -256,5 +407,16 @@ func (e encoding) asString(xs interface{}) (string, bool) {
// pickle: str // pickle: str
s, ok := xs.(string) s, ok := xs.(string)
return s, ok return s, ok
case 'M':
// msgpack: bin(from py2) | str(from py3)
switch s := xs.(type) {
case []byte:
return string(s), true
case string:
return s, true
default:
return "", false
}
} }
} }
...@@ -223,6 +223,11 @@ func (r rpc) zeo5Error(arg interface{}) error { ...@@ -223,6 +223,11 @@ func (r rpc) zeo5Error(arg interface{}) error {
// //
// nil is returned if arg does not represent an exception. // nil is returned if arg does not represent an exception.
func (r rpc) zeo4Error(arg interface{}) error { func (r rpc) zeo4Error(arg interface{}) error {
// in non-pickle encodings errors are always indicated via msgExcept flag
if r.zlink.enc != 'Z' {
return nil
}
// (exc_class, exc_inst), e.g. // (exc_class, exc_inst), e.g.
// ogórek.Tuple{ // ogórek.Tuple{
// ogórek.Class{Module:"ZODB.POSException", Name:"POSKeyError"}, // ogórek.Class{Module:"ZODB.POSException", Name:"POSKeyError"},
......
...@@ -21,6 +21,7 @@ package zeo ...@@ -21,6 +21,7 @@ package zeo
import ( import (
"context" "context"
"fmt"
"io/ioutil" "io/ioutil"
"net/url" "net/url"
"os" "os"
...@@ -41,6 +42,8 @@ import ( ...@@ -41,6 +42,8 @@ import (
type ZEOSrv interface { type ZEOSrv interface {
Addr() string // unix-socket address of the server Addr() string // unix-socket address of the server
Close() error Close() error
Encoding() encoding // encoding used on the wire - 'M' or 'Z'
} }
// ZEOPySrv represents running ZEO/py server. // ZEOPySrv represents running ZEO/py server.
...@@ -56,6 +59,7 @@ type ZEOPySrv struct { ...@@ -56,6 +59,7 @@ type ZEOPySrv struct {
} }
type ZEOPyOptions struct { type ZEOPyOptions struct {
msgpack bool // whether to advertise msgpack
} }
// StartZEOPySrv starts ZEO/py server for FileStorage database located at fs1path. // StartZEOPySrv starts ZEO/py server for FileStorage database located at fs1path.
...@@ -67,6 +71,11 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) { ...@@ -67,6 +71,11 @@ func StartZEOPySrv(fs1path string, opt ZEOPyOptions) (_ *ZEOPySrv, err error) {
z := &ZEOPySrv{fs1path: fs1path, cancel: cancel, done: make(chan struct{})} z := &ZEOPySrv{fs1path: fs1path, cancel: cancel, done: make(chan struct{})}
z.pysrv = exec.CommandContext(ctx, "python", "-m", "ZEO.runzeo", "-f", fs1path, "-a", z.Addr()) z.pysrv = exec.CommandContext(ctx, "python", "-m", "ZEO.runzeo", "-f", fs1path, "-a", z.Addr())
z.opt = opt z.opt = opt
msgpack := ""
if opt.msgpack {
msgpack = "y"
}
z.pysrv.Env = append(os.Environ(), "ZEO_MSGPACK="+msgpack)
z.pysrv.Stdin = nil z.pysrv.Stdin = nil
z.pysrv.Stdout = os.Stdout z.pysrv.Stdout = os.Stdout
z.pysrv.Stderr = os.Stderr z.pysrv.Stderr = os.Stderr
...@@ -125,6 +134,12 @@ func (z *ZEOPySrv) Close() (err error) { ...@@ -125,6 +134,12 @@ func (z *ZEOPySrv) Close() (err error) {
return err return err
} }
func (z *ZEOPySrv) Encoding() encoding {
enc := encoding('Z')
if z.opt.msgpack { enc = encoding('M') }
return enc
}
// ---------------- // ----------------
...@@ -161,21 +176,27 @@ func withZEOSrv(t *testing.T, f func(t *testing.T, zsrv ZEOSrv), optv ...tOption ...@@ -161,21 +176,27 @@ func withZEOSrv(t *testing.T, f func(t *testing.T, zsrv ZEOSrv), optv ...tOption
f(fs1path) f(fs1path)
} }
// ZEO/py for _, msgpack := range []bool{false, true} {
t.Run("py", func(t *testing.T) { // ZEO/py
t.Helper() t.Run(fmt.Sprintf("py/msgpack=%v", msgpack), func(t *testing.T) {
xtesting.NeedPy(t, "ZEO") t.Helper()
withFS1(t, func(fs1path string) { needpy := []string{"ZEO"}
X := xtesting.FatalIf(t) if msgpack {
needpy = append(needpy, "msgpack")
zpy, err := StartZEOPySrv(fs1path, ZEOPyOptions{}); X(err) }
defer func() { xtesting.NeedPy(t, needpy...)
err := zpy.Close(); X(err) withFS1(t, func(fs1path string) {
}() X := xtesting.FatalIf(t)
f(t, zpy) zpy, err := StartZEOPySrv(fs1path, ZEOPyOptions{msgpack: msgpack}); X(err)
defer func() {
err := zpy.Close(); X(err)
}()
f(t, zpy)
})
}) })
}) }
} }
// withZEO tests f on all kinds of ZEO servers connected to by ZEO client. // withZEO tests f on all kinds of ZEO servers connected to by ZEO client.
...@@ -203,7 +224,10 @@ func TestHandshake(t *testing.T) { ...@@ -203,7 +224,10 @@ func TestHandshake(t *testing.T) {
err := zlink.Close(); X(err) err := zlink.Close(); X(err)
}() }()
// conntected ok ewant := zsrv.Encoding()
if zlink.enc != ewant {
t.Fatalf("handshake: encoding=%c ; want %c", zlink.enc, ewant)
}
}) })
} }
......
...@@ -67,7 +67,7 @@ type zLink struct { ...@@ -67,7 +67,7 @@ type zLink struct {
errDown error // error with which the link was shut down errDown error // error with which the link was shut down
ver string // protocol version in use (without "Z" or "M" prefix) ver string // protocol version in use (without "Z" or "M" prefix)
enc encoding // protocol encoding in use (always 'Z') enc encoding // protocol encoding in use ('Z' or 'M')
} }
// (called after handshake) // (called after handshake)
...@@ -375,6 +375,8 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) { ...@@ -375,6 +375,8 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
// server first announces its preferred protocol // server first announces its preferred protocol
// it is e.g. "M5", "Z5", "Z4", "Z3101", ... // it is e.g. "M5", "Z5", "Z4", "Z3101", ...
//
// first letter is preferred encoding: 'M' (msgpack), or 'Z' (pickles).
pkb, err := zl.recvPkt() pkb, err := zl.recvPkt()
if err != nil { if err != nil {
return fmt.Errorf("rx: %s", err) return fmt.Errorf("rx: %s", err)
...@@ -386,9 +388,8 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) { ...@@ -386,9 +388,8 @@ func handshake(ctx context.Context, conn net.Conn) (_ *zLink, err error) {
return fmt.Errorf("rx: invalid peer handshake: %q", proto) return fmt.Errorf("rx: invalid peer handshake: %q", proto)
} }
// even if server announced it prefers 'M' (msgpack) it will // use wire encoding preferred by server
// accept 'Z' (pickles) as encoding. We always use 'Z'. enc := encoding(proto[0])
enc := encoding('Z')
// extract peer version from protocol string and choose actual // extract peer version from protocol string and choose actual
// version to use as min(peer, mybest) // version to use as min(peer, mybest)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment