Commit 46fb8ddd authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c5ea78d6
// Copyright (C) 2018-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package zeo
// marshalling of messages <-> wire encoding.
// see https://github.com/zopefoundation/ZEO/blob/5.2.1-20-gcb26281d/doc/protocol.rst
import (
"bytes"
"encoding/binary"
"fmt"
msgp "github.com/tinylib/msgp/msgp"
msgpack "github.com/shamaton/msgpack"
pickle "github.com/kisielk/og-rek"
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
)
// ---- message encode/decode ----
// pktEncode encodes message into raw packet.
func (zl *zLink) pktEncode(m msg) *pktBuf {
switch zl.encoding {
case 'Z': return pktEncodeZ(m)
case 'M': return pktEncodeM(m)
default: panic("bug")
}
}
// pktDecode decodes raw packet into message.
func (zl *zLink) pktDecode(pkb *pktBuf) (msg, error) {
switch zl.encoding {
case 'Z': return pktDecodeZ(pkb)
case 'M': return pktDecodeM(pkb)
default: panic("bug")
}
}
// pktEncodeZ encodes message into raw Z (pickle) packet.
func pktEncodeZ(m msg) *pktBuf {
pkb := allocPkb()
p := pickle.NewEncoder(pkb)
// tuple -> pickle.Tuple
arg := m.arg
tup, ok := arg.(tuple)
if ok {
arg = pickle.Tuple(tup)
}
err := p.Encode(pickle.Tuple{m.msgid, m.flags, m.method, arg})
if err != nil {
panic(err) // all our types are expected to be supported by pickle
}
return pkb
}
// pktEncodeM encodes message into raw M (msgpack) packet.
func pktEncodeM(m msg) *pktBuf {
pkb := allocPkb()
data := pkb.data
data = msgp.AppendArrayHeader(data, 4)
data = msgp.AppendInt64(data, m.msgid) // msgid
data = msgp.AppendInt64(data, int64(m.flags)) // flags
data = msgp.AppendString(data, m.method) // method
// arg
// it is interface{} - use shamaton/msgpack since msgp does not handle
// arbitrary interfaces well.
dataArg, err := msgpack.Encode(m.arg)
if err != nil {
panic(err) // all our types are expected to be supported by msgpack
}
data = append(data, dataArg...)
pkb.data = data
return pkb
}
// pktDecodeZ decodes raw Z (pickle) packet into message.
func pktDecodeZ(pkb *pktBuf) (msg, error) {
var m msg
// must be (msgid, False|0, ".reply", res)
d := pickle.NewDecoder(bytes.NewReader(pkb.Payload()))
xpkt, err := d.Decode()
if err != nil {
return m, err
}
tpkt, ok := xpkt.(pickle.Tuple) // XXX also list?
if !ok {
return m, derrf("got %T; expected tuple", xpkt)
}
if len(tpkt) != 4 {
return m, derrf("len(msg-tuple)=%d; expected 4", len(tpkt))
}
m.msgid, ok = pickletools.Xint64(tpkt[0])
if !ok {
return m, derrf("msgid: got %T; expected int", tpkt[0])
}
flags, ok := pickletools.Xint64(tpkt[1])
if !ok {
bflags, ok := tpkt[1].(bool)
if !ok {
return m, derrf("flags: got %T; expected int|bool", tpkt[1])
}
if bflags {
flags = 1
} // else: flags is already = 0
}
// XXX check flags are in range?
m.flags = msgFlags(flags)
m.method, ok = tpkt[2].(string)
if !ok {
return m, derrf(".%d: method: got %T; expected str", m.msgid, tpkt[2])
}
m.arg = tpkt[3]
return m, nil
}
// pktDecodeM decodes raw M (msgpack) packet into message.
func pktDecodeM(pkb *pktBuf) (msg, error) {
var m msg
b := pkb.Payload()
// must be (msgid, False|0, "method", arg)
l, b, err := msgp.ReadArrayHeaderBytes(b)
if err != nil {
return m, derrf("%s", err)
}
if l != 4 {
return m, derrf("len(msg-tuple)=%d; expected 4", l)
}
// msgid
v := int64(0)
switch t := msgp.NextType(b); t {
case msgp.IntType:
v, b, err = msgp.ReadInt64Bytes(b)
case msgp.UintType:
var x uint64
x, b, err = msgp.ReadUint64Bytes(b)
v = int64(x)
default:
err = fmt.Errorf("got %s; expected int", t)
}
if err != nil {
return m, derrf("msgid: %s", err)
}
m.msgid = v
// flags
v = int64(0)
switch t := msgp.NextType(b); t {
case msgp.BoolType:
var x bool
x, b, err = msgp.ReadBoolBytes(b)
if x { v = 1 }
case msgp.IntType:
v, b, err = msgp.ReadInt64Bytes(b)
case msgp.UintType:
var x uint64
x, b, err = msgp.ReadUint64Bytes(b)
v = int64(x)
default:
err = fmt.Errorf("got %s; expected int|bool", t)
}
if err != nil {
return m, derrf("flags: %s", err)
}
// XXX check flags are in range?
m.flags = msgFlags(v)
// method
s := ""
switch t := msgp.NextType(b); t {
case msgp.StrType:
s, b, err = msgp.ReadStringBytes(b)
case msgp.BinType:
var x []byte
x, b, err = msgp.ReadBytesZC(b)
s = string(x)
default:
err = fmt.Errorf("got %s; expected str|bin", t)
}
if err != nil {
return m, derrf(".%d: method: %s", m.msgid, err)
}
m.method = s
// arg
// it is interface{} - use shamaton/msgpack since msgp does not handle
// arbitrary interfaces well.
btail, err := msgp.Skip(b)
if err != nil {
return m, derrf(".%d: arg: %s", m.msgid, err)
}
if len(btail) != 0 {
return m, derrf(".%d: payload has extra data after message")
}
err = msgpack.Decode(b, &m.arg)
if err != nil {
return m, derrf(".%d: arg: %s", m.msgid, err)
}
return m, nil
}
func derrf(format string, argv ...interface{}) error {
return fmt.Errorf("decode: "+format, argv...)
}
// ---- encode/decode for data types ----
// xuint64Unpack tries to decode packed 8-byte string as bigendian uint64
func (zl *zLink) xuint64Unpack(xv interface{}) (uint64, bool) {
switch zl.encoding {
default:
panic("bug")
case 'Z':
// pickle: str|bytes
v, err := pickletools.Xstrbytes8(xv)
if err != nil {
return 0, false
}
return v, true
case 'M':
// msgpack decodes bytes as []byte (which corresponds to bytearray in pickle)
switch v := xv.(type) {
default:
return 0, false
case []byte:
if len(v) != 8 {
return 0, false
}
return binary.BigEndian.Uint64(v), true
}
}
}
// xuint64Pack packs v into big-endian 8-byte string
func (zl *zLink) xuint64Pack(v uint64) interface{} {
var b [8]byte
binary.BigEndian.PutUint64(b[:], v)
switch zl.encoding {
default:
panic("bug")
case 'Z':
// pickle: -> str XXX do we need to emit bytes for py3? -> TODO yes, after switch to protocol=3
return mem.String(b[:])
case 'M':
// msgpack: -> bin
return b[:]
}
}
func (zl *zLink) tidPack(tid zodb.Tid) interface{} {
return zl.xuint64Pack(uint64(tid))
}
func (zl *zLink) oidPack(oid zodb.Oid) interface{} {
return zl.xuint64Pack(uint64(oid))
}
func (zl *zLink) tidUnpack(xv interface{}) (zodb.Tid, bool) {
v, ok := zl.xuint64Unpack(xv)
return zodb.Tid(v), ok
}
func (zl *zLink) oidUnpack(xv interface{}) (zodb.Oid, bool) {
v, ok := zl.xuint64Unpack(xv)
return zodb.Oid(v), ok
}
// asTuple tries to decode object as tuple. XXX
func (zl *zLink) asTuple(xt interface{}) (tuple, bool) {
switch zl.encoding {
default:
panic("bug")
case 'Z':
// pickle: tuples are represented by picklet.Tuple
t, ok := xt.(pickle.Tuple)
return tuple(t), ok
case 'M':
// msgpack: tuples are encoded as arrays; decoded as []interface{}
t, ok := xt.([]interface{})
return tuple(t), ok
}
}
// asBytes tries to decode object as raw bytes.
func (zl *zLink) asBytes(xb interface{}) ([]byte, bool) {
switch zl.encoding{
default:
panic("bug")
case 'Z':
// pickle: str|bytes
s, err := pickletools.Xstrbytes(xb)
if err != nil {
return nil, false
}
return mem.Bytes(s), true
case 'M':
// msgpack: bin
b, ok := xb.([]byte)
return b, ok
}
}
// asString tries to decode object as string.
func (zl *zLink) asString(xs interface{}) (string, bool) {
switch zl.encoding{
default:
panic("bug")
case 'Z':
// pickle: str
s, ok := xs.(string)
return s, ok
case 'M':
// msgpack: bin(from py2) | str(from py3)
switch s := xs.(type) {
case []byte:
return string(s), true
case string:
return s, true
default:
return "", false
}
}
}
...@@ -22,7 +22,6 @@ package zeo ...@@ -22,7 +22,6 @@ package zeo
import ( import (
"context" "context"
"encoding/binary"
"fmt" "fmt"
"log" "log"
"net/url" "net/url"
...@@ -35,7 +34,6 @@ import ( ...@@ -35,7 +34,6 @@ import (
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
) )
type zeo struct { type zeo struct {
...@@ -419,136 +417,3 @@ func (z *zeo) URL() string { ...@@ -419,136 +417,3 @@ func (z *zeo) URL() string {
func init() { func init() {
zodb.RegisterDriver("zeo", openByURL) zodb.RegisterDriver("zeo", openByURL)
} }
// ---- data conversion with unified interface for Z/M encoding ----
// xuint64Unpack tries to decode packed 8-byte string as bigendian uint64
func (zl *zLink) xuint64Unpack(xv interface{}) (uint64, bool) {
switch zl.encoding {
default:
panic("bug")
case 'Z':
// pickle: str|bytes
v, err := pickletools.Xstrbytes8(xv)
if err != nil {
return 0, false
}
return v, true
case 'M':
// msgpack decodes bytes as []byte (which corresponds to bytearray in pickle)
switch v := xv.(type) {
default:
return 0, false
case []byte:
if len(v) != 8 {
return 0, false
}
return binary.BigEndian.Uint64(v), true
}
}
}
// xuint64Pack packs v into big-endian 8-byte string
func (zl *zLink) xuint64Pack(v uint64) interface{} {
var b [8]byte
binary.BigEndian.PutUint64(b[:], v)
switch zl.encoding {
default:
panic("bug")
case 'Z':
// pickle: -> str XXX do we need to emit bytes for py3? -> TODO yes, after switch to protocol=3
return mem.String(b[:])
case 'M':
// msgpack: -> bin
return b[:]
}
}
func (zl *zLink) tidPack(tid zodb.Tid) interface{} {
return zl.xuint64Pack(uint64(tid))
}
func (zl *zLink) oidPack(oid zodb.Oid) interface{} {
return zl.xuint64Pack(uint64(oid))
}
func (zl *zLink) tidUnpack(xv interface{}) (zodb.Tid, bool) {
v, ok := zl.xuint64Unpack(xv)
return zodb.Tid(v), ok
}
func (zl *zLink) oidUnpack(xv interface{}) (zodb.Oid, bool) {
v, ok := zl.xuint64Unpack(xv)
return zodb.Oid(v), ok
}
// asTuple tries to decode object as tuple. XXX
func (zl *zLink) asTuple(xt interface{}) (tuple, bool) {
switch zl.encoding {
default:
panic("bug")
case 'Z':
// pickle: tuples are represented by picklet.Tuple
t, ok := xt.(pickle.Tuple)
return tuple(t), ok
case 'M':
// msgpack: tuples are encoded as arrays; decoded as []interface{}
t, ok := xt.([]interface{})
return tuple(t), ok
}
}
// asBytes tries to decode object as raw bytes.
func (zl *zLink) asBytes(xb interface{}) ([]byte, bool) {
switch zl.encoding{
default:
panic("bug")
case 'Z':
// pickle: str|bytes
s, err := pickletools.Xstrbytes(xb)
if err != nil {
return nil, false
}
return mem.Bytes(s), true
case 'M':
// msgpack: bin
b, ok := xb.([]byte)
return b, ok
}
}
// asString tries to decode object as string.
func (zl *zLink) asString(xs interface{}) (string, bool) {
switch zl.encoding{
default:
panic("bug")
case 'Z':
// pickle: str
s, ok := xs.(string)
return s, ok
case 'M':
// msgpack: bin(from py2) | str(from py3)
switch s := xs.(type) {
case []byte:
return string(s), true
case string:
return s, true
default:
return "", false
}
}
}
...@@ -21,7 +21,6 @@ package zeo ...@@ -21,7 +21,6 @@ package zeo
// RPC calls client<->server // RPC calls client<->server
import ( import (
"bytes"
"context" "context"
"encoding/binary" "encoding/binary"
"errors" "errors"
...@@ -31,20 +30,13 @@ import ( ...@@ -31,20 +30,13 @@ import (
"net" "net"
"sync" "sync"
msgp "github.com/tinylib/msgp/msgp"
msgpack "github.com/shamaton/msgpack"
pickle "github.com/kisielk/og-rek"
"github.com/someonegg/gocontainer/rbuf" "github.com/someonegg/gocontainer/rbuf"
"lab.nexedi.com/kirr/go123/xbytes" "lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/zodb/internal/pickletools"
) )
const pktHeaderLen = 4
// we can speak this protocol versions // we can speak this protocol versions
var protoVersions = []string{ var protoVersions = []string{
"3101", // last in ZEO3 series "3101", // last in ZEO3 series
...@@ -180,7 +172,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error { ...@@ -180,7 +172,7 @@ func (zl *zLink) serveRecv1(pkb *pktBuf) error {
return nil return nil
} }
// tuple corresponds to py tuple. // tuple represents py tuple.
type tuple []interface{} type tuple []interface{}
// msg represents 1 message. // msg represents 1 message.
...@@ -197,203 +189,6 @@ const ( ...@@ -197,203 +189,6 @@ const (
msgExcept = 2 // exception was raised on remote side (ZEO5) msgExcept = 2 // exception was raised on remote side (ZEO5)
) )
func derrf(format string, argv ...interface{}) error {
return fmt.Errorf("decode: "+format, argv...)
}
// pktDecode decodes raw packet into message.
func (zl *zLink) pktDecode(pkb *pktBuf) (msg, error) {
switch zl.encoding {
case 'Z': return pktDecodeZ(pkb)
case 'M': return pktDecodeM(pkb)
default: panic("bug")
}
}
// pktEncode encodes message into raw packet.
func (zl *zLink) pktEncode(m msg) *pktBuf {
switch zl.encoding {
case 'Z': return pktEncodeZ(m)
case 'M': return pktEncodeM(m)
default: panic("bug")
}
}
// pktDecodeZ decodes raw Z (pickle) packet into message.
func pktDecodeZ(pkb *pktBuf) (msg, error) {
var m msg
// must be (msgid, False|0, ".reply", res)
d := pickle.NewDecoder(bytes.NewReader(pkb.Payload()))
xpkt, err := d.Decode()
if err != nil {
return m, err
}
tpkt, ok := xpkt.(pickle.Tuple) // XXX also list?
if !ok {
return m, derrf("got %T; expected tuple", xpkt)
}
if len(tpkt) != 4 {
return m, derrf("len(msg-tuple)=%d; expected 4", len(tpkt))
}
m.msgid, ok = pickletools.Xint64(tpkt[0])
if !ok {
return m, derrf("msgid: got %T; expected int", tpkt[0])
}
flags, ok := pickletools.Xint64(tpkt[1])
if !ok {
bflags, ok := tpkt[1].(bool)
if !ok {
return m, derrf("flags: got %T; expected int|bool", tpkt[1])
}
if bflags {
flags = 1
} // else: flags is already = 0
}
// XXX check flags are in range?
m.flags = msgFlags(flags)
m.method, ok = tpkt[2].(string)
if !ok {
return m, derrf(".%d: method: got %T; expected str", m.msgid, tpkt[2])
}
m.arg = tpkt[3] // XXX pickle.Tuple -> tuple
return m, nil
}
// pktDecodeM decodes raw M (msgpack) packet into message.
func pktDecodeM(pkb *pktBuf) (msg, error) {
var m msg
b := pkb.Payload()
// must be (msgid, False|0, "method", arg)
l, b, err := msgp.ReadArrayHeaderBytes(b)
if err != nil {
return m, derrf("%s", err)
}
if l != 4 {
return m, derrf("len(msg-tuple)=%d; expected 4", l)
}
// msgid
v := int64(0)
switch t := msgp.NextType(b); t {
case msgp.IntType:
v, b, err = msgp.ReadInt64Bytes(b)
case msgp.UintType:
var x uint64
x, b, err = msgp.ReadUint64Bytes(b)
v = int64(x)
default:
err = fmt.Errorf("got %s; expected int", t)
}
if err != nil {
return m, derrf("msgid: %s", err)
}
m.msgid = v
// flags
v = int64(0)
switch t := msgp.NextType(b); t {
case msgp.BoolType:
var x bool
x, b, err = msgp.ReadBoolBytes(b)
if x { v = 1 }
case msgp.IntType:
v, b, err = msgp.ReadInt64Bytes(b)
case msgp.UintType:
var x uint64
x, b, err = msgp.ReadUint64Bytes(b)
v = int64(x)
default:
err = fmt.Errorf("got %s; expected int|bool", t)
}
if err != nil {
return m, derrf("flags: %s", err)
}
// XXX check flags are in range?
m.flags = msgFlags(v)
// method
s := ""
switch t := msgp.NextType(b); t {
case msgp.StrType:
s, b, err = msgp.ReadStringBytes(b)
case msgp.BinType:
var x []byte
x, b, err = msgp.ReadBytesZC(b)
s = string(x)
default:
err = fmt.Errorf("got %s; expected str|bin", t)
}
if err != nil {
return m, derrf(".%d: method: %s", m.msgid, err)
}
m.method = s
// arg
// it is interface{} - use shamaton/msgpack since msgp does not handle
// arbitrary interfaces well.
btail, err := msgp.Skip(b)
if err != nil {
return m, derrf(".%d: arg: %s", m.msgid, err)
}
if len(btail) != 0 {
return m, derrf(".%d: payload has extra data after message")
}
err = msgpack.Decode(b, &m.arg)
if err != nil {
return m, derrf(".%d: arg: %s", m.msgid, err)
}
return m, nil
}
// pktEncodeZ encodes message into raw Z (pickle) packet.
func pktEncodeZ(m msg) *pktBuf {
pkb := allocPkb()
p := pickle.NewEncoder(pkb)
// tuple -> pickle.Tuple
arg := m.arg
tup, ok := arg.(tuple)
if ok {
arg = pickle.Tuple(tup)
}
err := p.Encode(pickle.Tuple{m.msgid, m.flags, m.method, arg})
if err != nil {
panic(err) // all our types are expected to be supported by pickle
}
return pkb
}
// pktEncodeM encodes message into raw M (msgpack) packet.
func pktEncodeM(m msg) *pktBuf {
pkb := allocPkb()
data := pkb.data
data = msgp.AppendArrayHeader(data, 4)
data = msgp.AppendInt64(data, m.msgid) // msgid
data = msgp.AppendInt64(data, int64(m.flags)) // flags
data = msgp.AppendString(data, m.method) // method
// arg
// it is interface{} - use shamaton/msgpack since msgp does not handle
// arbitrary interfaces well.
dataArg, err := msgpack.Encode(m.arg)
if err != nil {
panic(err) // all our types are expected to be supported by msgpack
}
data = append(data, dataArg...)
pkb.data = data
return pkb
}
// Call makes 1 RPC call to server, waits for reply and returns it. // Call makes 1 RPC call to server, waits for reply and returns it.
func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) { func (zl *zLink) Call(ctx context.Context, method string, argv ...interface{}) (reply msg, _ error) {
...@@ -456,6 +251,9 @@ func (zl *zLink) RegisterMethod(method string, f func(arg interface{})) { ...@@ -456,6 +251,9 @@ func (zl *zLink) RegisterMethod(method string, f func(arg interface{})) {
// ---- raw IO ---- // ---- raw IO ----
// packet = {size(u32), data}
const pktHeaderLen = 4
// pktBuf is buffer with packet data. // pktBuf is buffer with packet data.
// //
// alloc via allocPkb and free via pkb.Free. // alloc via allocPkb and free via pkb.Free.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment