Commit 21e815b1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5dd5eaf4
...@@ -172,19 +172,19 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -172,19 +172,19 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen) //n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen]) n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
if err != nil { if err != nil {
return nil, err // XXX err adjust ? return nil, err // XXX err adjust ? -> (?) framing error
} }
pkth := pkt.Header() pkth := pkt.Header()
// XXX -> better PktHeader.Decode() ? // XXX -> better PktHeader.Decode() ?
if ntoh32(pkth.Len) < PktHeadLen { if ntoh32(pkth.Len) < PktHeadLen {
// TODO err + close nodelink (framing broken) // TODO framing error -> nl.CloseWithError(err)
panic("TODO pkt.Len < PktHeadLen") // XXX err (length is a whole packet len with header) panic("TODO pkt.Len < PktHeadLen") // length is a whole packet len with header
} }
if ntoh32(pkth.Len) > MAX_PACKET_SIZE { if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
// TODO err + close nodelink (framing broken) (?) // TODO framing error -> nl.CloseWithError(err)
panic("TODO message too big") // XXX err panic("TODO message too big")
} }
if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) { if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
...@@ -200,7 +200,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -200,7 +200,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if n < len(pkt.Data) { if n < len(pkt.Data) {
_, err = io.ReadFull(nl.peerLink, pkt.Data[n:]) _, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
if err != nil { if err != nil {
return nil, err // XXX err adjust ? return nil, err // XXX err adjust ? -> (?) framing error
} }
} }
...@@ -247,7 +247,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -247,7 +247,7 @@ func (nl *NodeLink) serveRecv() {
// XXX check err actually what is on interrupt? // XXX check err actually what is on interrupt?
return return
} }
panic(err) // XXX err panic(err) // XXX err -> if !temporary -> nl.closeWithError(err)
} }
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
...@@ -438,7 +438,8 @@ func (l *Listener) Accept() (*NodeLink, error) { ...@@ -438,7 +438,8 @@ func (l *Listener) Accept() (*NodeLink, error) {
return NewNodeLink(peerConn, LinkServer), nil return NewNodeLink(peerConn, LinkServer), nil
} }
// TODO +tls.Config +ctx // TODO +tls.Config
// TODO +ctx -> no as .Close() will interrupt all .Accept()
func Listen(network, laddr string) (*Listener, error) { func Listen(network, laddr string) (*Listener, error) {
l, err := net.Listen(network, laddr) l, err := net.Listen(network, laddr)
if err != nil { if err != nil {
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// NEO | Connection management. Tests // NEO. Connection management. Tests
package neo package neo
......
...@@ -35,5 +35,3 @@ func (pkt *PktBuf) Header() *PktHead { ...@@ -35,5 +35,3 @@ func (pkt *PktBuf) Header() *PktHead {
func (pkt *PktBuf) Payload() []byte { func (pkt *PktBuf) Payload() []byte {
return pkt.Data[PktHeadLen:] return pkt.Data[PktHeadLen:]
} }
// NEO. Protocol description // NEO. Protocol description
package neo package neo
//package proto
/*
import (
. "../"
)
*/
const ( const (
PROTOCOL_VERSION = 8 PROTOCOL_VERSION = 8
MIN_PACKET_SIZE = 10 // XXX link this to len(pkthead) ? MIN_PACKET_SIZE = 10 // XXX unsafe.Sizeof(PktHead{}) give _typed_ constant (uintptr)
PktHeadLen = MIN_PACKET_SIZE // TODO link this to PktHead.Encode/Decode size PktHeadLen = MIN_PACKET_SIZE // TODO link this to PktHead.Encode/Decode size ?
MAX_PACKET_SIZE = 0x4000000 MAX_PACKET_SIZE = 0x4000000
RESPONSE_MASK = 0x800 RESPONSE_MASK = 0x8000
) )
type ErrorCode int type ErrorCode int
...@@ -100,7 +93,7 @@ type Checksum [20]byte ...@@ -100,7 +93,7 @@ type Checksum [20]byte
// Zero value means "invalid id" (<-> None in py.PPTID) // Zero value means "invalid id" (<-> None in py.PPTID)
type PTid uint64 // XXX move to common place ? type PTid uint64 // XXX move to common place ?
// TODO None encodes as '\xff' * 8 (XXX use nan for None ?) // TODO None encodes as '\xff' * 8 (-> use NaN for None)
type Float64 float64 type Float64 float64
// NOTE original NodeList = []NodeInfo // NOTE original NodeList = []NodeInfo
...@@ -114,8 +107,8 @@ type NodeInfo struct { ...@@ -114,8 +107,8 @@ type NodeInfo struct {
//type CellList []struct { //type CellList []struct {
type CellInfo struct { type CellInfo struct {
UUID UUID // XXX maybe simply 'UUID' ? UUID
CellState CellState // ----///---- CellState
} }
//type RowList []struct { //type RowList []struct {
...@@ -127,7 +120,7 @@ type RowInfo struct { ...@@ -127,7 +120,7 @@ type RowInfo struct {
// XXX link request <-> answer ? // XXX link request <-> answer ?
// TODO ensure len(encoded packet header) == 10 // XXX naming -> PktHeader ?
type PktHead struct { type PktHead struct {
ConnId be32 // NOTE is .msgid in py ConnId be32 // NOTE is .msgid in py
MsgCode be16 MsgCode be16
...@@ -169,7 +162,7 @@ type RequestIdentification struct { ...@@ -169,7 +162,7 @@ type RequestIdentification struct {
ProtocolVersion uint32 // TODO py.PProtocol upon decoding checks for != PROTOCOL_VERSION ProtocolVersion uint32 // TODO py.PProtocol upon decoding checks for != PROTOCOL_VERSION
NodeType NodeType // XXX name NodeType NodeType // XXX name
UUID UUID UUID UUID
Address // where requesting node is also accepting connectios Address // where requesting node is also accepting connections
Name string Name string
IdTimestamp Float64 IdTimestamp Float64
} }
...@@ -182,7 +175,7 @@ type AcceptIdentification struct { ...@@ -182,7 +175,7 @@ type AcceptIdentification struct {
NumPartitions uint32 // PNumber NumPartitions uint32 // PNumber
NumReplicas uint32 // PNumber NumReplicas uint32 // PNumber
YourUUID UUID YourUUID UUID
Primary Address // TODO Primary Address
KnownMasterList []struct { KnownMasterList []struct {
Address Address
UUID UUID UUID UUID
...@@ -664,7 +657,7 @@ type AnswerObjectUndoSerial struct { ...@@ -664,7 +657,7 @@ type AnswerObjectUndoSerial struct {
} }
} }
// Ask a storage is oid is locked by another transaction. // Ask a storage if oid is locked by another transaction.
// C -> S // C -> S
// Answer whether a transaction holds the write lock for requested object. // Answer whether a transaction holds the write lock for requested object.
type HasLock struct { type HasLock struct {
......
// Copyright (C) 2016 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// NEO. Protocol definition. Tests
package neo
import (
"testing"
"unsafe"
)
func TestPktHeader(t *testing.T) {
// make sure PktHeader is really packed
if unsafe.Sizeof(PktHead{}) != 10 {
t.Fatalf("sizeof(PktHead) = %v ; want 10", unsafe.Sizeof(PktHead{}))
}
}
// Copyright (C) 2016 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// NEO. Protocol definition. Code generator
// TODO text what it does (generates code for proto.go) // TODO text what it does (generates code for proto.go)
// +build ignore // +build ignore
......
// TODO copyright / license
package neo
import (
"context"
"fmt"
)
// Server is an interface that represents networked server
type Server interface {
// ServeLink serves already established nodelink (connection) in a blocking way.
// ServeLink is usually run in separate goroutine
ServeLink(ctx context.Context, link *NodeLink)
}
// Run service on a listener
// - accept incoming connection on the listener
// - for every accepted connection spawn srv.ServeLink() in separate goroutine.
//
// the listener is closed when Serve returns.
func Serve(ctx context.Context, l *Listener, srv Server) error {
fmt.Printf("stor: serving on %s ...\n", l.Addr())
// close listener when either cancelling or returning (e.g. due to an error)
// ( when cancelling - listener close will signal to all accepts to
// terminate with an error )
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
case <-retch:
}
l.Close() // XXX err
}()
// main Accept -> ServeLink loop
for {
link, err := l.Accept()
if err != nil {
// TODO err == closed <-> ctx was cancelled
// TODO err -> net.Error && .Temporary() -> some throttling
return err
}
go srv.ServeLink(ctx, link)
}
}
// TODO text
// XXX move -> generic place ?
// XXX split -> separate Listen() & Serve()
func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error {
l, err := Listen(net_, laddr)
if err != nil {
return err
}
// TODO set keepalive on l
// TODO if TLS config -> tls.NewListener()
return Serve(ctx, l, srv)
}
// TODO copyright / license // Copyright (C) 2016 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 2, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// TODO text
package neo package neo
...@@ -24,8 +36,11 @@ type Buffer struct { ...@@ -24,8 +36,11 @@ type Buffer struct {
} }
*/ */
func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) { func (stor *StorageApplication) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("stor: serving new client %s <-> %s\n", conn.LocalAddr(), conn.RemoteAddr()) fmt.Printf("stor: serving new node %s <-> %s\n", link.peerLink.LocalAddr(), link.peerLink.RemoteAddr())
//fmt.Fprintf(conn, "Hello up there, you address is %s\n", conn.RemoteAddr()) // XXX err //fmt.Fprintf(conn, "Hello up there, you address is %s\n", conn.RemoteAddr()) // XXX err
//conn.Close() // XXX err //conn.Close() // XXX err
...@@ -41,62 +56,3 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) { ...@@ -41,62 +56,3 @@ func (stor *StorageApplication) ServeConn(ctx context.Context, conn net.Conn) {
//recvPkt() //recvPkt()
} }
// ----------------------------------------
// Server is an interface that represents networked server XXX text
type Server interface {
// ServeConn serves already established connection in a blocking way.
// ServeConn is usually run in separate goroutine XXX text
ServeConn(ctx context.Context, conn net.Conn) // XXX error ?
}
// Run service on a listener
// - accept incoming connection on the listener
// - for every accepted connection spawn srv.ServeConn() in separate goroutine.
//
// the listener is closed when Serve returns.
// XXX move -> generic place ?
func Serve(ctx context.Context, l net.Listener, srv Server) error {
fmt.Printf("stor: serving on %s ...\n", l.Addr())
// close listener when either cancelling or returning (e.g. due to an error)
// ( when cancelling - listener close will signal to all accepts to
// terminate with an error )
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
case <-retch:
}
l.Close() // XXX err
}()
// main Accept -> ServeConn loop
for {
conn, err := l.Accept()
if err != nil {
// TODO err == closed <-> ctx was cancelled
// TODO err -> net.Error && .Temporary() -> some throttling
return err
}
go srv.ServeConn(ctx, conn)
}
}
// TODO text
// XXX move -> generic place ?
// XXX split -> separate Listen() & Serve()
func ListenAndServe(ctx context.Context, net_, laddr string, srv Server) error {
l, err := net.Listen(net_, laddr)
if err != nil {
return err
}
// TODO set keepalive on l
// TODO if TLS config -> tls.NewListener()
return Serve(ctx, l, srv)
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment