Commit 65ff3185 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c6dda6c2
......@@ -200,7 +200,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
}
// shutdown closes peerLink and marks NodeLink as no longer operational
// it also shutdowns and all opened connections over this node link.
// it also shutdowns all opened connections over this node link.
func (nl *NodeLink) shutdown() {
nl.downOnce.Do(func() {
close(nl.down)
......
......@@ -67,7 +67,7 @@ type Master struct {
type nodeCome struct {
link *NodeLink
idReq RequestIdentification // we received this identification request
idResp chan NEOEncoder // what we reply (AcceptIdentification | Error)
idResp chan NEOPkt // what we reply (AcceptIdentification | Error)
}
// node disconnects
......@@ -704,7 +704,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
}
// convey identification request to master
idRespCh := make(chan NEOEncoder)
idRespCh := make(chan NEOPkt)
m.nodeCome <- nodeCome{link, idReq, idRespCh}
idResp := <-idRespCh
......@@ -760,7 +760,7 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
m.stateMu.Unlock()
go func() {
var pkt NEOEncoder
var pkt NEOPkt
for {
select {
......
......@@ -25,11 +25,12 @@ import (
// TODO organize rx buffers management (freelist etc)
// Buffer with packet data
// PktBuf is a buffer with full raw packet (header + data)
type PktBuf struct {
Data []byte // whole packet data including all headers XXX -> Buf ?
}
// PktHead represents header of a raw packet
// XXX naming -> PktHeader ?
type PktHead struct {
ConnId be32 // NOTE is .msgid in py
......@@ -37,19 +38,20 @@ type PktHead struct {
MsgLen be32 // payload message length (excluding packet header)
}
// Get pointer to packet header
// Header returns pointer to packet header
func (pkt *PktBuf) Header() *PktHead {
// XXX check len(Data) < PktHead ? -> no, Data has to be allocated with cap >= PktHeadLen
return (*PktHead)(unsafe.Pointer(&pkt.Data[0]))
}
// Get packet payload
// Payload returns []byte representing packet payload
func (pkt *PktBuf) Payload() []byte {
return pkt.Data[PktHeadLen:]
}
// packet dumping
// Strings dumps a packet
// XXX -> use .Dump() for full dump?
func (pkt *PktBuf) String() string {
if len(pkt.Data) < PktHeadLen {
return fmt.Sprintf("(! < PktHeadLen) % x", pkt.Data)
......
This diff is collapsed.
......@@ -137,29 +137,42 @@ type NodeUUID int32
var ErrDecodeOverflow = errors.New("decode: bufer overflow")
// NEOEncoder is interface for marshaling packets to wire format
type NEOEncoder interface {
// NEOEncodedInfo returns message code needed to be used for the packet
// on the wire and how much space is needed to encode payload
// XXX naming?
NEOEncodedInfo() (msgCode uint16, payloadLen int)
// NEOPkt is the interface implemented by packets to marshal/unmarshal them into/from wire format
type NEOPkt interface {
// NEOPktMsgCode returns message code needed to be used for particular packet type
// on the wire
NEOPktMsgCode() uint16
// NEOEncode performs the encoding.
// len(buf) must be >= payloadLen returned by NEOEncodedInfo
NEOEncode(buf []byte)
// NEOPktEncodedLen returns how much space is needed to encode current state
NEOPktEncodedLen() int
// NEOPktEncode encodes current state into buf.
// len(buf) must be >= NEOPktEncodedLen()
NEOPktEncode(buf []byte)
// NEOPktDecode decodes data into.
NEOPktDecode(data []byte) (nread int, err error)
}
// NEODecoder is interface for unmarshalling packets from wire format
type NEODecoder interface {
NEODecode(data []byte) (nread int, err error)
/*
// XXX do we need to keep it splitted as encoder/decoder ?
// NEOPktDecoder is the interface implemented by packets to unmarshal them from wire format
type NEOPktDecoder interface {
// NEOPktMsgCode returns message code that must have been used on the wire for this packet
NEOPktMsgCode() uint16
}
// NEOCodec is interface combining NEOEncoder & NEODecoder
// NEOPkt is interface combining NEOPktEncoder & NEOPktDecoder
// in particular it covers all NEO packets
type NEOCodec interface {
NEOEncoder
NEODecoder
type NEOPkt interface {
NEOPktEncoder
NEOPktDecoder
// XXX is in both encoder and decoder
NEOPktMsgCode() uint16
}
*/
type Address struct {
Host string
......@@ -167,7 +180,7 @@ type Address struct {
}
// NOTE if Host == "" -> Port not added to wire (see py.PAddress):
// func (a *Address) NEOEncode(b []byte) int {
// func (a *Address) NEOPktEncode(b []byte) int {
// n := string_NEOEncode(a.Host, b[0:])
// if a.Host != "" {
// BigEndian.PutUint16(b[n:], a.Port)
......
......@@ -69,9 +69,9 @@ func TestPktHeader(t *testing.T) {
}
// test marshalling for one packet type
func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) {
func testPktMarshal(t *testing.T, pkt NEOPkt, encoded string) {
typ := reflect.TypeOf(pkt).Elem() // type of *pkt
pkt2 := reflect.New(typ).Interface().(NEOCodec)
pkt2 := reflect.New(typ).Interface().(NEOPkt)
defer func() {
if e := recover(); e != nil {
t.Errorf("%v: panic ↓↓↓:", typ)
......@@ -80,7 +80,8 @@ func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) {
}()
// pkt.encode() == expected
msgCode, n := pkt.NEOEncodedInfo()
msgCode := pkt.NEOPktMsgCode()
n := pkt.NEOPktEncodedLen()
msgType := pktTypeRegistry[msgCode]
if msgType != typ {
t.Errorf("%v: msgCode = %v which corresponds to %v", typ, msgCode, msgType)
......@@ -90,7 +91,7 @@ func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) {
}
buf := make([]byte, n)
pkt.NEOEncode(buf)
pkt.NEOPktEncode(buf)
if string(buf) != encoded {
t.Errorf("%v: encode result unexpected:", typ)
t.Errorf("\thave: %s", hexpkg.EncodeToString(buf))
......@@ -120,13 +121,13 @@ func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) {
}
}()
pkt.NEOEncode(buf[:l])
pkt.NEOPktEncode(buf[:l])
}()
}
// pkt.decode() == expected
data := []byte(encoded + "noise")
n, err := pkt2.NEODecode(data)
n, err := pkt2.NEOPktDecode(data)
if err != nil {
t.Errorf("%v: decode error %v", typ, err)
}
......@@ -140,7 +141,7 @@ func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) {
// decode must detect buffer overflow
for l := len(encoded)-1; l >= 0; l-- {
n, err = pkt2.NEODecode(data[:l])
n, err = pkt2.NEOPktDecode(data[:l])
if !(n==0 && err==ErrDecodeOverflow) {
t.Errorf("%v: decode overflow not detected on [:%v]", typ, l)
}
......@@ -151,7 +152,7 @@ func testPktMarshal(t *testing.T, pkt NEOCodec, encoded string) {
// test encoding/decoding of packets
func TestPktMarshal(t *testing.T) {
var testv = []struct {
pkt NEOCodec
pkt NEOPkt
encoded string // []byte
} {
// empty
......@@ -267,12 +268,12 @@ func TestPktMarshal(t *testing.T) {
func TestPktMarshalAllOverflowLightly(t *testing.T) {
for _, typ := range pktTypeRegistry {
// zero-value for a type
pkt := reflect.New(typ).Interface().(NEOCodec)
_, l := pkt.NEOEncodedInfo()
pkt := reflect.New(typ).Interface().(NEOPkt)
l := pkt.NEOPktEncodedLen()
zerol := make([]byte, l)
// decoding will turn nil slice & map into empty allocated ones.
// we need it so that reflect.DeepEqual works for pkt encode/decode comparison
n, err := pkt.NEODecode(zerol)
n, err := pkt.NEOPktDecode(zerol)
if !(n == l && err == nil) {
t.Errorf("%v: zero-decode unexpected: %v, %v ; want %v, nil", typ, n, err, l)
}
......
......@@ -21,12 +21,12 @@
NEO. Protocol module. Code generator
This program generates marshalling code for packet types defined in proto.go .
For every type 3 methods are generated in accordance with NEOEncoder and
NEODecoder interfaces:
For every type 4 methods are generated in accordance with NEOPkt interface:
NEOEncodedInfo() (msgCode uint16, payloadLen int)
NEOEncode(buf []byte)
NEODecode(data []byte) (nread int, err error)
NEOPktMsgCode() uint16
NEOPktEncodedLen() int
NEOPktEncode(buf []byte)
NEOPktDecode(data []byte) (nread int, err error)
List of packet types is obtained via searching through proto.go AST - looking
for appropriate struct declarations there.
......@@ -197,7 +197,11 @@ import (
case *ast.StructType:
fmt.Fprintf(&buf, "// %d. %s\n\n", pktCode, typename)
buf.WriteString(generateCodecCode(typespec, &sizer{msgCode: pktCode}))
buf.emit("func (_ *%s) NEOPktMsgCode() uint16 {", typename)
buf.emit("return %d", pktCode)
buf.emit("}\n")
buf.WriteString(generateCodecCode(typespec, &sizer{}))
buf.WriteString(generateCodecCode(typespec, &encoder{}))
buf.WriteString(generateCodecCode(typespec, &decoder{}))
......@@ -460,17 +464,13 @@ func (o *OverflowCheck) AddExpr(format string, a ...interface{}) {
type sizer struct {
commonCodeGen
size SymSize // currently accumulated packet size
// which code to also return as packet msgCode
// (sizer does not compute this - it is emitted as-is given by caller)
msgCode int
}
// encoder generates code to encode a packet
//
// when type is recursively walked, for every case code to update `data[n:]` is generated.
// no overflow checks are generated as by NEOEncoder interface provided data
// buffer should have at least payloadLen length returned by NEOEncodedInfo()
// no overflow checks are generated as by NEOPkt interface provided data
// buffer should have at least payloadLen length returned by NEOPktEncodedInfo()
// (the size computed by sizer).
//
// the code emitted looks like:
......@@ -479,7 +479,7 @@ type sizer struct {
// encode<typ2>(data[n2:], path2)
// ...
//
// TODO encode have to care in NEOEncode to emit preambule such that bound
// TODO encode have to care in NEOPktEncode to emit preambule such that bound
// checking is performed only once (currenty compiler emits many of them)
type encoder struct {
commonCodeGen
......@@ -527,7 +527,7 @@ var _ CodeGenerator = (*decoder)(nil)
func (s *sizer) generatedCode() string {
code := Buffer{}
// prologue
code.emit("func (%s *%s) NEOEncodedInfo() (uint16, int) {", s.recvName, s.typeName)
code.emit("func (%s *%s) NEOPktEncodedLen() int {", s.recvName, s.typeName)
if s.varUsed["size"] {
code.emit("var %s int", s.var_("size"))
}
......@@ -539,7 +539,7 @@ func (s *sizer) generatedCode() string {
if s.varUsed["size"] {
size += " + " + s.var_("size")
}
code.emit("return %v, %v", s.msgCode, size)
code.emit("return %v", size)
code.emit("}\n")
return code.String()
......@@ -548,7 +548,7 @@ func (s *sizer) generatedCode() string {
func (e *encoder) generatedCode() string {
code := Buffer{}
// prologue
code.emit("func (%s *%s) NEOEncode(data []byte) {", e.recvName, e.typeName)
code.emit("func (%s *%s) NEOPktEncode(data []byte) {", e.recvName, e.typeName)
code.Write(e.buf.Bytes())
......@@ -655,7 +655,7 @@ func (d *decoder) generatedCode() string {
code := Buffer{}
// prologue
code.emit("func (%s *%s) NEODecode(data []byte) (int, error) {", d.recvName, d.typeName)
code.emit("func (%s *%s) NEOPktDecode(data []byte) (int, error) {", d.recvName, d.typeName)
if d.varUsed["nread"] {
code.emit("var %v uint32", d.var_("nread"))
}
......
......@@ -94,6 +94,7 @@ func ListenAndServe(ctx context.Context, net Network, laddr string, srv Server)
// IdentifyPeer identifies peer on the link
// it expects peer to send RequestIdentification packet and replies with AcceptIdentification if identification passes.
// returns information about identified node or error.
// XXX recheck identification logic here
func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) {
defer xerr.Contextf(&err, "%s: identify", link)
......@@ -137,6 +138,7 @@ func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentifi
// IdentifyWith identifies local node with remote peer
// it also verifies peer's node type to what caller expects
// XXX place != ok (this is client, not server ?)
func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clusterName string) (accept *AcceptIdentification, err error) {
defer xerr.Contextf(&err, "%s: request identification", link)
......@@ -176,7 +178,7 @@ func IdentifyWith(expectPeerType NodeType, link *NodeLink, myInfo NodeInfo, clus
// XXX naming for RecvAndDecode and EncodeAndSend
// RecvAndDecode receives packet from conn and decodes it
func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interface{}
func RecvAndDecode(conn *Conn) (NEOPkt, error) {
pkt, err := conn.Recv()
if err != nil {
return nil, err
......@@ -194,8 +196,8 @@ func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interf
}
// TODO use free-list for decoded packets + when possible decode in-place
pktObj := reflect.New(msgType).Interface().(NEOCodec)
_, err = pktObj.NEODecode(pkt.Payload())
pktObj := reflect.New(msgType).Interface().(NEOPkt)
_, err = pktObj.NEOPktDecode(pkt.Payload())
if err != nil {
// XXX -> ProtoError ?
return nil, &ConnError{Conn: conn, Op: "decode", Err: err}
......@@ -205,23 +207,23 @@ func RecvAndDecode(conn *Conn) (NEOEncoder, error) { // XXX NEOEncoder -> interf
}
// EncodeAndSend encodes pkt and sends it to conn
func EncodeAndSend(conn *Conn, pkt NEOEncoder) error {
msgCode, l := pkt.NEOEncodedInfo()
func EncodeAndSend(conn *Conn, pkt NEOPkt) error {
l := pkt.NEOPktEncodedLen()
buf := PktBuf{make([]byte, PktHeadLen + l)} // XXX -> freelist
h := buf.Header()
// h.ConnId will be set by conn.Send
h.MsgCode = hton16(msgCode)
h.MsgCode = hton16(pkt.NEOPktMsgCode())
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
pkt.NEOEncode(buf.Payload())
pkt.NEOPktEncode(buf.Payload())
return conn.Send(&buf) // XXX why pointer?
}
// Ask does simple request/response protocol exchange
// It expects the answer to be exactly of resp type and errors otherwise
func Ask(conn *Conn, req NEOEncoder, resp NEODecoder) error {
func Ask(conn *Conn, req NEOPkt, resp NEOPkt) error {
err := EncodeAndSend(conn, req)
if err != nil {
return err
......@@ -246,7 +248,7 @@ func (e *ProtoError) Error() string {
// Expect receives 1 packet and expects it to be exactly of msg type
// XXX naming (-> Recv1 ?)
func Expect(conn *Conn, msg NEODecoder) (err error) {
func Expect(conn *Conn, msg NEOPkt) (err error) {
pkt, err := conn.Recv()
if err != nil {
return err
......@@ -267,7 +269,7 @@ func Expect(conn *Conn, msg NEODecoder) (err error) {
// unexpected Error response
if msgType == reflect.TypeOf(Error{}) {
errResp := Error{}
_, err = errResp.NEODecode(pkt.Payload())
_, err = errResp.NEOPktDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
......@@ -282,7 +284,7 @@ func Expect(conn *Conn, msg NEODecoder) (err error) {
return &ProtoError{conn, fmt.Errorf("unexpected packet: %v", msgType)}
}
_, err = msg.NEODecode(pkt.Payload())
_, err = msg.NEOPktDecode(pkt.Payload())
if err != nil {
return &ProtoError{conn, err}
}
......
......@@ -312,7 +312,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
xid.TidBefore = true
}
var reply NEOEncoder
var reply NEOPkt
data, tid, err := stor.zstor.Load(xid)
if err != nil {
// TODO translate err to NEO protocol error codes
......@@ -334,7 +334,7 @@ func (stor *Storage) ServeClient(ctx context.Context, conn *Conn) {
EncodeAndSend(conn, reply) // XXX err
case *LastTransaction:
var reply NEOEncoder
var reply NEOPkt
lastTid, err := stor.zstor.LastTid()
if err != nil {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment