Commit 64513925 authored by Kirill Smelkov's avatar Kirill Smelkov

go/neo/neonet: Start (first draft)

Continue NEO/go with neonet - the layer to exchange messages in between
NEO nodes.

NEO/go shifts from thinking about NEO protocol logic as RPC to thinking
of it as more general network protocol and so settles to provide general
connection-oriented message exchange service. This way neonet provides
generic connection multiplexing on top of a single TCP node-node link.

Neonet compatibility with NEO/py depends on the following small NEO/py patch:

    kirr/neo@dd3bb8b4

which adjusts message ID a bit so it behaves like stream_id in HTTP/2:

    - always even for server initiated streams
    - always odd  for client initiated streams

and is incremented by += 2, instead of += 1 to maintain above invariant.

See http://navytux.spb.ru/~kirr/neo.html#development-overview (starting from
"Then comes the link layer which provides service to exchange messages over
network...") for the rationale.

Unfortunately current NEO/py maintainer is very much against merging that patch.

This patch brings in the core of neonet. Next patches will add initial
handshaking, user-level Send/Recv + Ask/Expect and "lightweight mode".

Some neonet core history:

lab.nexedi.com/kirr/neo/commit/6b9ed46d	X neonet: Avoid integer overflow on max packet length check
lab.nexedi.com/kirr/neo/commit/8eac771c	X neo/connection: Fix race between link.shutdown() and conn.lightClose()
lab.nexedi.com/kirr/neo/commit/8021a1d5	X rxghandoff
lab.nexedi.com/kirr/neo/commit/68738036	X ... but negative impact on separate client / server processes, strange ...
lab.nexedi.com/kirr/neo/commit/b0dda9d2	X serveRecv: help Go scheduler to switch to receiving G sooner
lab.nexedi.com/kirr/neo/commit/4989918a	X remove defer from rx/tx hot paths
lab.nexedi.com/kirr/neo/commit/e055406a	X no select for acceptq - similarly for rxq path
lab.nexedi.com/kirr/neo/commit/c28ad4d0	X Conn.Recv: receive without select
lab.nexedi.com/kirr/neo/commit/496bd425	X add benchmark RTT over plain net.Conn with serveRecv-style RX handler
lab.nexedi.com/kirr/neo/commit/9fa79958	X draft how to mark RX down without reallocating .rxdown
lab.nexedi.com/kirr/neo/commit/4324c812	X restore all Conn functionality
lab.nexedi.com/kirr/neo/commit/a8e61d2f	X serveSend is not needed
lab.nexedi.com/kirr/neo/commit/9d047b36	X recvPkt via only 1 syscall
lab.nexedi.com/kirr/neo/commit/b555a507	X baseline net RTT benchmark
lab.nexedi.com/kirr/neo/commit/91be5cdd	X everyone is listening from start; CloseAccept to disable listening - works
lab.nexedi.com/kirr/neo/commit/c2a1b63a	X naming: Packet = raw data; Message = meaningful object
lab.nexedi.com/kirr/neo/commit/6fd0c9be	X connection: Adding context to errors from NodeLink and Conn operations
lab.nexedi.com/kirr/neo/commit/65b17bdc	X rework Conn acceptance to be explicit via NodeLink.Accept
parent 5beab048
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package neonet provides service to exchange messages in a NEO network.
//
// A NEO node - node
// link (NodeLink) provides service for multiplexing several communication
// connections on top of it. Connections (Conn) in turn provide service to
// exchange NEO protocol messages.
//
// New connections can be created with link.NewConn(). Once connection is
// created and a message is sent over it, on peer's side another corresponding
// new connection can be accepted via link.Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
//
// See also package lab.nexedi.com/kirr/neo/go/neo/proto for definition of NEO
// messages.
package neonet
// XXX neonet compatibility with NEO/py depends on the following small NEO/py patch:
//
// https://lab.nexedi.com/kirr/neo/commit/dd3bb8b4
//
// which adjusts message ID a bit so it behaves like stream_id in HTTP/2:
//
// - always even for server initiated streams
// - always odd for client initiated streams
//
// and is incremented by += 2, instead of += 1 to maintain above invariant.
//
// See http://navytux.spb.ru/~kirr/neo.html#development-overview (starting from
// "Then comes the link layer which provides service to exchange messages over
// network...") for the rationale.
//
// Unfortunately current NEO/py maintainer is very much against merging that patch.
import (
"errors"
"fmt"
"io"
"math"
"net"
//"runtime"
"sync"
"time"
"lab.nexedi.com/kirr/neo/go/internal/packed"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"github.com/someonegg/gocontainer/rbuf"
"lab.nexedi.com/kirr/go123/xbytes"
)
// NodeLink is a node-node link in NEO.
//
// A node-node link represents bidirectional symmetrical communication
// channel in between 2 NEO nodes. The link provides service for multiplexing
// several communication connections on top of the node-node link.
//
// New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding
// new connection can be accepted via .Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
//
// A NodeLink has to be explicitly closed, once it is no longer needed.
//
// It is safe to use NodeLink from multiple goroutines simultaneously.
type NodeLink struct {
peerLink net.Conn // raw conn to peer
connMu sync.Mutex
connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv}
txq chan txReq // tx requests from Conns go via here
// (rx packets are routed to Conn.rxq)
acceptq chan *Conn // queue of incoming connections for Accept
axqWrite atomic32 // 1 while serveRecv is doing `acceptq <- ...`
axqRead atomic32 // +1 while Accept is doing `... <- acceptq`
axdownFlag atomic32 // 1 when AX is marked no longer operational
// axdown chan struct{} // ready when accept is marked as no longer operational
axdown1 sync.Once // CloseAccept may be called several times
down chan struct{} // ready when NodeLink is marked as no longer operational
downOnce sync.Once // shutdown may be due to both Close and IO error
downWg sync.WaitGroup // for activities at shutdown
errClose error // error got from peerLink.Close
errMu sync.Mutex
errRecv error // error got from recvPkt on shutdown
axclosed atomic32 // whether CloseAccept was called
closed atomic32 // whether Close was called
rxbuf rbuf.RingBuf // buffer for reading from peerLink
// scheduling optimization: whenever serveRecv sends to Conn.rxq
// receiving side must ack here to receive G handoff.
// See comments in serveRecv for details.
rxghandoff chan struct{}
}
// XXX rx handoff make latency better for serial request-reply scenario but
// does a lot of harm for case when there are several parallel requests -
// serveRecv after handing off is put to tail of current cpu runqueue - not
// receiving next requests and not spawning handlers for them, thus essential
// creating Head-of-line (HOL) blocking problem.
//
// XXX ^^^ problem reproducible on deco but not on z6001
const rxghandoff = true // XXX whether to do rxghandoff trick
// Conn is a connection established over NodeLink.
//
// Messages can be sent and received over it.
// Once connection is no longer needed it has to be closed.
//
// It is safe to use Conn from multiple goroutines simultaneously.
type Conn struct {
link *NodeLink
connId uint32
rxq chan *pktBuf // received packets for this Conn go here
rxqWrite atomic32 // 1 while serveRecv is doing `rxq <- ...`
rxqRead atomic32 // +1 while Conn.Recv is doing `... <- rxq`
rxdownFlag atomic32 // 1 when RX is marked no longer operational
// XXX ^^^ split to different cache lines?
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
// rxdown chan struct{} // ready when RX is marked no longer operational
rxdownOnce sync.Once // ----//---- XXX review
rxclosed atomic32 // whether CloseRecv was called
txerr chan error // transmit results for this Conn go back here
txdown chan struct{} // ready when Conn TX is marked as no longer operational
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
txclosed atomic32 // whether CloseSend was called
// closing Conn is shutdown + some cleanup work to remove it from
// link.connTab including arming timers etc. Let this work be spawned only once.
// (for Conn.Close to be valid called several times)
closeOnce sync.Once
}
var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink
var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrLinkManyConn = errors.New("too many opened connections")
var ErrClosedConn = errors.New("connection is closed")
// LinkError is returned by NodeLink operations.
type LinkError struct {
Link *NodeLink
Op string
Err error
}
// ConnError is returned by Conn operations.
type ConnError struct {
Link *NodeLink
ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here
Op string
Err error
}
// _LinkRole is a role an end of NodeLink is intended to play.
type _LinkRole int
const (
_LinkServer _LinkRole = iota // link created as server
_LinkClient // link created as client
// for testing:
linkNoRecvSend _LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
linkFlagsMask _LinkRole = (1<<32 - 1) << 16
)
// newNodeLink makes a new NodeLink from already established net.Conn .
//
// Role specifies how to treat our role on the link - either as client or
// server. The difference in between client and server roles is in:
//
// how connection ids are allocated for connections initiated at our side:
// there is no conflict in identifiers if one side always allocates them as
// even (server) and its peer as odd (client).
//
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink {
var nextConnId uint32
switch role &^ linkFlagsMask {
case _LinkServer:
nextConnId = 0 // all initiated by us connId will be even
case _LinkClient:
nextConnId = 1 // ----//---- odd
default:
panic("invalid conn role")
}
nl := &NodeLink{
peerLink: conn,
connTab: map[uint32]*Conn{},
nextConnId: nextConnId,
acceptq: make(chan *Conn), // XXX +buf ?
txq: make(chan txReq),
rxghandoff: make(chan struct{}),
// axdown: make(chan struct{}),
down: make(chan struct{}),
}
if role&linkNoRecvSend == 0 {
nl.serveWg.Add(2)
go nl.serveRecv()
go nl.serveSend()
}
return nl
}
// newConn creates new Conn with id=connId and registers it into connTab.
// must be called with connMu held.
func (link *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{
link: link,
connId: connId,
rxq: make(chan *pktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf ?
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
txdown: make(chan struct{}),
// rxdown: make(chan struct{}),
}
link.connTab[connId] = c
return c
}
// NewConn creates new connection on top of node-node link.
func (link *NodeLink) NewConn() (*Conn, error) {
link.connMu.Lock()
//defer link.connMu.Unlock()
c, err := link._NewConn()
link.connMu.Unlock()
return c, err
}
func (link *NodeLink) _NewConn() (*Conn, error) {
if link.connTab == nil {
if link.closed.Get() != 0 {
return nil, link.err("newconn", ErrLinkClosed)
}
return nil, link.err("newconn", ErrLinkDown)
}
// nextConnId could wrap around uint32 limits - find first free slot to
// not blindly replace existing connection
for i := uint32(0); ; i++ {
_, exists := link.connTab[link.nextConnId]
if !exists {
break
}
link.nextConnId += 2
if i > math.MaxUint32 / 2 {
return nil, link.err("newconn", ErrLinkManyConn)
}
}
c := link.newConn(link.nextConnId)
link.nextConnId += 2
return c, nil
}
// shutdownAX marks acceptq as no longer operational and interrupts Accept.
func (link *NodeLink) shutdownAX() {
link.axdown1.Do(func() {
// close(link.axdown)
link.axdownFlag.Set(1) // XXX cmpxchg and return if already down?
// drain all connections from .acceptq:
// - something could be already buffered there
// - serveRecv could start writing acceptq at the same time we set axdownFlag; we derace it
for {
// if serveRecv is outside `.acceptq <- ...` critical
// region and fully drained - we are done.
// see description of the logic in shutdownRX
if link.axqWrite.Get() == 0 && len(link.acceptq) == 0 {
break
}
select {
case conn := <-link.acceptq:
// serveRecv already put at least 1 packet into conn.rxq before putting
// conn into .acceptq - shutting it down will send the error to peer.
conn.shutdownRX(errConnRefused)
link.connMu.Lock()
delete(link.connTab, conn.connId)
link.connMu.Unlock()
default:
// ok - continue spinning
}
}
// wakeup Accepts
for {
// similarly to above: .axdownFlag vs .axqRead
// see logic description in shutdownRX
if link.axqRead.Get() == 0 {
break
}
select {
case link.acceptq <- nil:
// ok - woken up
default:
// ok - continue spinning
}
}
})
}
// shutdown closes raw link to peer and marks NodeLink as no longer operational.
//
// it also shutdowns all opened connections over this node link.
func (nl *NodeLink) shutdown() {
nl.shutdownAX()
nl.downOnce.Do(func() {
close(nl.down)
// close actual link to peer. this will wakeup {send,recv}Pkt
// NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv
nl.errClose = nl.peerLink.Close()
nl.downWg.Add(1)
go func() {
defer nl.downWg.Done()
// wait for serve{Send,Recv} to complete before shutting connections down
//
// we have to do it so that e.g. serveSend has chance
// to return last error from sendPkt to requester.
nl.serveWg.Wait()
// clear + mark down .connTab + shutdown all connections
nl.connMu.Lock()
connTab := nl.connTab
nl.connTab = nil
nl.connMu.Unlock()
// conn.shutdown() outside of link.connMu lock
for _, conn := range connTab {
conn.shutdown()
}
}()
})
}
// CloseAccept instructs node link to not accept incoming connections anymore.
//
// Any blocked Accept() will be unblocked and return error.
// The peer will receive "connection refused" if it tries to connect after and
// for already-queued connection requests.
//
// It is safe to call CloseAccept several times.
func (link *NodeLink) CloseAccept() {
link.axclosed.Set(1)
link.shutdownAX()
}
// Close closes node-node link.
//
// All blocking operations - Accept and IO on associated connections
// established over node link - are automatically interrupted with an error.
// Underlying raw connection is closed.
// It is safe to call Close several times.
func (link *NodeLink) Close() error {
link.axclosed.Set(1)
link.closed.Set(1)
link.shutdown()
link.downWg.Wait()
return link.err("close", link.errClose)
}
// shutdown marks connection as no longer operational and interrupts Send and Recv.
func (c *Conn) shutdown() {
c.shutdownTX()
c.shutdownRX(errConnClosed)
}
// shutdownTX marks TX as no longer operational and interrupts Send.
func (c *Conn) shutdownTX() {
c.txdownOnce.Do(func() {
close(c.txdown)
})
}
// shutdownRX marks .rxq as no longer operational and interrupts Recv.
func (c *Conn) shutdownRX(errMsg *proto.Error) {
c.rxdownOnce.Do(func() {
// close(c.rxdown) // wakeup Conn.Recv
c.downRX(errMsg)
})
}
// downRX marks .rxq as no longer operational.
//
// used in shutdownRX.
func (c *Conn) downRX(errMsg *proto.Error) {
// let serveRecv know RX is down for this connection
c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down?
// drain all packets from .rxq:
// - something could be already buffered there
// - serveRecv could start writing rxq at the same time we set rxdownFlag; we derace it.
i := 0
for {
// we set .rxdownFlag=1 above.
// now if serveRecv is outside `.rxq <- ...` critical section we know it is either:
// - before it -> it will eventually see .rxdownFlag=1 and won't send pkt to rxq.
// - after it -> it already sent pkt to rxq and won't touch
// rxq until next packet (where it will hit "before it").
//
// when serveRecv stopped sending we know we are done draining when rxq is empty.
if c.rxqWrite.Get() == 0 && len(c.rxq) == 0 {
break
}
select {
case <-c.rxq:
c.rxack()
i++
default:
// ok - continue spinning
}
}
// if something was queued already there - reply "connection closed"
if i != 0 {
go c.link.replyNoConn(c.connId, errMsg)
}
// wakeup recvPkt(s)
for {
// similarly to above:
// we set .rxdownFlag=1
// now if recvPkt is outside `... <- .rxq` critical section we know that it is either:
// - before it -> it will eventually see .rxdownFlag=1 and won't try to read rxq.
// - after it -> it already read pktfrom rxq and won't touch
// rxq until next recvPkt (where it will hit "before it").
if c.rxqRead.Get() == 0 {
break
}
select {
case c.rxq <- nil:
// ok - woken up
default:
// ok - continue spinning
}
}
}
// time to keep record of a closed connection so that we can properly reply
// "connection closed" if a packet comes in with same connID.
var connKeepClosed = 1 * time.Minute
// CloseRecv closes reading end of connection.
//
// Any blocked Recv*() will be unblocked and return error.
// The peer will receive "connection closed" if it tries to send anything after
// and for messages already in local rx queue.
//
// It is safe to call CloseRecv several times.
func (c *Conn) CloseRecv() {
c.rxclosed.Set(1)
c.shutdownRX(errConnClosed)
}
// Close closes connection.
//
// Any blocked Send*() or Recv*() will be unblocked and return error.
//
// NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break node-node link framing.
//
// It is safe to call Close several times.
func (c *Conn) Close() error {
link := c.link
c.closeOnce.Do(func() {
c.rxclosed.Set(1)
c.txclosed.Set(1)
c.shutdown()
// adjust link.connTab
var tmpclosed *Conn
link.connMu.Lock()
if link.connTab != nil {
// connection was initiated by us - simply delete - we always
// know if a packet comes to such connection - it is closed.
//
// XXX checking vvv should be possible without connMu lock
if c.connId == link.nextConnId % 2 {
delete(link.connTab, c.connId)
// connection was initiated by peer which we accepted - put special
// "closed" connection into connTab entry for some time to reply
// "connection closed" if another packet comes to it.
//
// ( we cannot reuse same connection since after it is marked as
// closed Send refuses to work )
} else {
// delete(link.connTab, c.connId)
// XXX vvv was temp. disabled - costs a lot in 1req=1conn model
// c implicitly goes away from connTab
tmpclosed = link.newConn(c.connId)
}
}
link.connMu.Unlock()
if tmpclosed != nil {
tmpclosed.shutdownRX(errConnClosed)
time.AfterFunc(connKeepClosed, func() {
link.connMu.Lock()
delete(link.connTab, c.connId)
link.connMu.Unlock()
})
}
})
return nil
}
// ---- receive ----
// errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept.
func (link *NodeLink) errAcceptShutdownAX() error {
switch {
case link.closed.Get() != 0:
return ErrLinkClosed
case link.axclosed.Get() != 0:
return ErrLinkNoListen
default:
// XXX do the same as in errRecvShutdown (check link.errRecv)
return ErrLinkDown
}
}
// Accept waits for and accepts incoming connection on top of node-node link.
func (link *NodeLink) Accept() (*Conn, error) {
// semantically equivalent to the following:
// ( this is hot path for py compatibility mode because new connection
// is established in every message and select hurts performance )
//
// select {
// case <-link.axdown:
// return nil, link.err("accept", link.errAcceptShutdownAX())
//
// case c := <-link.acceptq:
// return c, nil
// }
var conn *Conn
var err error
link.axqRead.Add(1)
axdown := link.axdownFlag.Get() != 0
if !axdown {
conn = <-link.acceptq
}
link.axqRead.Add(-1)
// in contrast to recvPkt we can decide about error after releasing axqRead
// reason: link is not going to be released to a free pool.
if axdown || conn == nil {
err = link.err("accept", link.errAcceptShutdownAX())
}
return conn, err
}
// errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt.
func (c *Conn) errRecvShutdown() error {
switch {
case c.rxclosed.Get() != 0:
return ErrClosedConn
case c.link.closed.Get() != 0:
return ErrLinkClosed
default:
// we have to check what was particular RX error on nodelink shutdown
// only do that once - after reporting RX error the first time
// tell client the node link is no longer operational.
var err error
c.rxerrOnce.Do(func() {
c.link.errMu.Lock()
err = c.link.errRecv
c.link.errMu.Unlock()
})
if err == nil {
err = ErrLinkDown
}
return err
}
}
// recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*pktBuf, error) {
// semantically equivalent to the following:
// (this is hot path and select is not used for performance reason)
//
// select {
// case <-c.rxdown:
// return nil, c.err("recv", c.errRecvShutdown())
//
// case pkt := <-c.rxq:
// return pkt, nil
// }
var pkt *pktBuf
var err error
c.rxqRead.Add(1)
rxdown := c.rxdownFlag.Get() != 0
if !rxdown {
pkt = <-c.rxq
}
// decide about error while under rxqRead (if after - the Conn can go away to be released)
if rxdown || pkt == nil {
err = c.err("recv", c.errRecvShutdown())
}
c.rxqRead.Add(-1)
if err == nil {
c.rxack()
}
return pkt, err
}
// rxack unblocks serveRecv after it handed G to us.
// see comments about rxghandoff in serveRecv.
func (c *Conn) rxack() {
if !rxghandoff {
return
}
//fmt.Printf("conn: rxack <- ...\n")
c.link.rxghandoff <- struct{}{}
//fmt.Printf("\tconn: rxack <- ... ok\n")
}
// serveRecv handles incoming packets routing them to either appropriate
// already-established connection or, if node link is accepting incoming
// connections, to new connection put to accept queue.
func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done()
for {
// receive 1 packet
// NOTE if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
pkt, err := nl.recvPkt()
//fmt.Printf("\n%p recvPkt -> %v, %v\n", nl, pkt, err)
if err != nil {
// on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it.
nl.errMu.Lock()
nl.errRecv = err
nl.errMu.Unlock()
nl.shutdown()
return
}
// pkt.ConnId -> Conn
connId := packed.Ntoh32(pkt.Header().ConnId)
accept := false
nl.connMu.Lock()
// connTab is never nil here - because shutdown, before
// resetting it, waits for us to finish.
conn := nl.connTab[connId]
if conn == nil {
// message with connid for a stream initiated by peer
// it will be considered to be accepted (not if .axdown)
if connId % 2 != nl.nextConnId % 2 {
accept = true
conn = nl.newConn(connId)
}
// else it is message with connid that should be initiated by us
// leave conn=nil - we'll reply errConnClosed
}
nl.connMu.Unlock()
//fmt.Printf("%p\tconn: %v\n", nl, conn)
if conn == nil {
// see ^^^ "message with connid that should be initiated by us"
go nl.replyNoConn(connId, errConnClosed)
continue
}
// route packet to serving goroutine handler
//
// TODO backpressure when Recv is not keeping up with Send on peer side?
// (not to let whole link starve because of one connection)
//
// NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet
// there in acceptq.
conn.rxqWrite.Set(1)
rxdown := conn.rxdownFlag.Get() != 0
if !rxdown {
conn.rxq <- pkt
}
conn.rxqWrite.Set(0)
//fmt.Printf("%p\tconn.rxdown: %v\taccept: %v\n", nl, rxdown, accept)
// conn exists, but rx is down - "connection closed"
// (this cannot happen for newly accepted connection)
if rxdown {
go nl.replyNoConn(connId, errConnClosed)
continue
}
// this packet established new connection - try to accept it
if accept {
nl.axqWrite.Set(1)
axdown := nl.axdownFlag.Get() != 0
if !axdown {
nl.acceptq <- conn
}
nl.axqWrite.Set(0)
// we are not accepting the connection
if axdown {
go nl.replyNoConn(connId, errConnRefused)
nl.connMu.Lock()
delete(nl.connTab, conn.connId)
nl.connMu.Unlock()
continue
}
}
//fmt.Printf("%p\tafter accept\n", nl)
// Normally serveRecv G will continue to run with G waking up
// on rxq/acceptq only being put into the runqueue of current proc.
// By default proc runq will execute only when sendRecv blocks
// again next time deep in nl.recvPkt(), but let's force the switch
// now without additional waiting to reduce latency.
// XXX bad - puts serveRecv to global runq thus with high p to switch M
//runtime.Gosched()
// handoff execution to receiving goroutine via channel.
// rest of serveRecv is put to current P local runq.
//
// details:
// - https://github.com/golang/go/issues/20168
// - https://github.com/golang/go/issues/15110
//
// see BenchmarkTCPlo* - for serveRecv style RX handoff there
// cuts RTT from 12.5μs to 6.6μs (RTT without serveRecv style G is 4.8μs)
//
// TODO report upstream
if rxghandoff {
//fmt.Printf("serveRecv: <-rxghandoff\n")
<-nl.rxghandoff
//fmt.Printf("\tserveRecv: <-rxghandoff ok\n")
}
/*
// XXX goes away in favour of .rxdownFlag; reasons
// - no select
//
// XXX review synchronization via flags for correctness (e.g.
// if both G were on the same runqueue, spinning in G1 will
// prevent G2 progress)
//
// XXX maybe we'll need select if we add ctx into Send/Recv.
// don't even try `conn.rxq <- ...` if conn.rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv
// could receive something on rxdown Conn sometimes )
rxdown := false
select {
case <-conn.rxdown:
rxdown = true
default:
// ok
}
// route packet to serving goroutine handler
//
// TODO backpressure when Recv is not keeping up with Send on peer side?
// (not to let whole link starve because of one connection)
//
// NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet
// there in acceptq.
if !rxdown {
// XXX can avoid select here: if conn closer cares to drain rxq (?)
select {
case <-conn.rxdown:
rxdown = true
case conn.rxq <- pkt:
// ok
}
}
...
// this packet established new connection - try to accept it
if accept {
// don't even try `link.acceptq <- ...` if link.axdown is ready
// ( else since select is picking random ready variant Accept/serveRecv
// could receive something on axdown Link sometimes )
axdown := false
select {
case <-nl.axdown:
axdown = true
default:
// ok
}
// put conn to .acceptq
if !axdown {
// XXX can avoid select here if shutdownAX cares to drain acceptq (?)
select {
case <-nl.axdown:
axdown = true
case nl.acceptq <- conn:
//fmt.Printf("%p\t.acceptq <- conn ok\n", nl)
// ok
}
}
// we are not accepting the connection
if axdown {
conn.shutdownRX(errConnRefused)
nl.connMu.Lock()
delete(nl.connTab, conn.connId)
nl.connMu.Unlock()
}
}
*/
}
}
// ---- network replies for closed / refused connections ----
var errConnClosed = &proto.Error{proto.PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &proto.Error{proto.PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
func (link *NodeLink) replyNoConn(connId uint32, errMsg proto.Msg) {
//fmt.Printf("%s .%d: -> replyNoConn %v\n", link, connId, errMsg)
link.sendMsg(connId, errMsg) // ignore errors
//fmt.Printf("%s .%d: replyNoConn(%v) -> %v\n", link, connId, errMsg, err)
}
// ---- transmit ----
// txReq is request to transmit a packet. Result error goes back to errch.
type txReq struct {
pkt *pktBuf
errch chan error
}
// errSendShutdown returns appropriate error when c.txdown is found ready in Send.
func (c *Conn) errSendShutdown() error {
switch {
case c.txclosed.Get() != 0:
return ErrClosedConn
// the only other error possible besides Conn being .Close()'ed is that
// NodeLink was closed/shutdowned itself - on actual IO problems corresponding
// error is delivered to particular Send that caused it.
case c.link.closed.Get() != 0:
return ErrLinkClosed
default:
return ErrLinkDown
}
}
// sendPkt sends raw packet via connection.
//
// on success pkt is freed.
func (c *Conn) sendPkt(pkt *pktBuf) error {
err := c.sendPkt2(pkt)
return c.err("send", err)
}
func (c *Conn) sendPkt2(pkt *pktBuf) error {
// connId must be set to one associated with this connection
if pkt.Header().ConnId != packed.Hton32(c.connId) {
panic("Conn.sendPkt: connId wrong")
}
var err error
select {
case <-c.txdown:
return c.errSendShutdown()
case c.link.txq <- txReq{pkt, c.txerr}:
select {
// tx request was sent to serveSend and is being transmitted on the wire.
// the transmission may block for indefinitely long though and
// we cannot interrupt it as the only way to interrupt is
// .link.Close() which will close all other Conns.
//
// That's why we are also checking for c.txdown while waiting
// for reply from serveSend (and leave pkt to finish transmitting).
//
// NOTE after we return straight here serveSend won't be later
// blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.txdown:
// also poll c.txerr here because: when there is TX error,
// serveSend sends to c.txerr _and_ closes c.txdown .
// We still want to return actual transmission error to caller.
select {
case err = <-c.txerr:
return err
default:
return c.errSendShutdown()
}
case err = <-c.txerr:
return err
}
}
}
// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
defer nl.serveWg.Done()
for {
select {
case <-nl.down:
return
case txreq := <-nl.txq:
// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
err := nl.sendPkt(txreq.pkt)
//fmt.Printf("sendPkt -> %v\n", err)
// FIXME if several goroutines call conn.Send
// simultaneously - c.txerr even if buffered(1) will be
// overflown and thus deadlock here.
//
// -> require "Conn.Send must not be used concurrently"?
txreq.errch <- err
// on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it.
//
// XXX move to link.sendPkt?
if err != nil {
nl.shutdown()
return
}
}
}
}
// ---- raw IO ----
const dumpio = false
// sendPkt sends raw packet to peer.
//
// tx error, if any, is returned as is and is analyzed in serveSend.
//
// XXX pkt should be freed always or only on error?
func (nl *NodeLink) sendPkt(pkt *pktBuf) error {
if dumpio {
// XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
}
// NOTE Write writes data in full, or it is error
_, err := nl.peerLink.Write(pkt.data)
pkt.Free()
return err
}
var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer.
//
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*pktBuf, error) {
// FIXME if rxbuf is non-empty - first look there for header and then if
// we know size -> allocate pkt with that size.
pkt := pktAlloc(4096)
// len=4K but cap can be more since pkt is from pool - use all space to buffer reads
// XXX vvv -> pktAlloc() ?
data := pkt.data[:cap(pkt.data)]
n := 0 // number of pkt bytes obtained so far
// next packet could be already prefetched in part by previous read
if nl.rxbuf.Len() > 0 {
δn, _ := nl.rxbuf.Read(data[:proto.PktHeaderLen])
n += δn
}
// first read to read pkt header and hopefully rest of packet in 1 syscall
if n < proto.PktHeaderLen {
δn, err := io.ReadAtLeast(nl.peerLink, data[n:], proto.PktHeaderLen - n)
if err != nil {
return nil, err
}
n += δn
}
pkth := pkt.Header()
msgLen := packed.Ntoh32(pkth.MsgLen)
if msgLen > proto.PktMaxSize - proto.PktHeaderLen {
return nil, ErrPktTooBig
}
pktLen := int(proto.PktHeaderLen + msgLen) // whole packet length
// resize data if we don't have enough room in it
data = xbytes.Resize(data, pktLen)
data = data[:cap(data)]
// we might have more data already prefetched in rxbuf
if nl.rxbuf.Len() > 0 {
δn, _ := nl.rxbuf.Read(data[n:pktLen])
n += δn
}
// read rest of pkt data, if we need to
if n < pktLen {
δn, err := io.ReadAtLeast(nl.peerLink, data[n:], pktLen - n)
if err != nil {
return nil, err
}
n += δn
}
// put overread data into rxbuf for next reader
if n > pktLen {
nl.rxbuf.Write(data[pktLen:n])
}
// fixup data/pkt
data = data[:n]
pkt.data = data
if dumpio {
// XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
}
return pkt, nil
}
// ---- for convenience: Conn -> NodeLink & local/remote link addresses ----
// LocalAddr returns local address of the underlying link to peer.
func (link *NodeLink) LocalAddr() net.Addr {
return link.peerLink.LocalAddr()
}
// RemoteAddr returns remote address of the underlying link to peer.
func (link *NodeLink) RemoteAddr() net.Addr {
return link.peerLink.RemoteAddr()
}
// Link returns underlying NodeLink of this connection.
func (c *Conn) Link() *NodeLink {
return c.link
}
// ConnID returns connection identifier used for the connection.
func (c *Conn) ConnID() uint32 {
return c.connId
}
// ---- for convenience: String / Error / Cause ----
func (link *NodeLink) String() string {
s := fmt.Sprintf("%s - %s", link.LocalAddr(), link.RemoteAddr())
return s // XXX add "(closed)" if link is closed ?
// XXX other flags e.g. (down) ?
}
func (c *Conn) String() string {
s := fmt.Sprintf("%s .%d", c.link, c.connId)
return s // XXX add "(closed)" if c is closed ?
}
func (e *LinkError) Error() string {
return fmt.Sprintf("%s: %s: %s", e.Link, e.Op, e.Err)
}
func (e *ConnError) Error() string {
return fmt.Sprintf("%s .%d: %s: %s", e.Link, e.ConnId, e.Op, e.Err)
}
func (e *LinkError) Cause() error { return e.Err }
func (e *ConnError) Cause() error { return e.Err }
func (nl *NodeLink) err(op string, e error) error {
if e == nil {
return nil
}
return &LinkError{Link: nl, Op: op, Err: e}
}
func (c *Conn) err(op string, e error) error {
if e == nil {
return nil
}
return &ConnError{Link: c.link, ConnId: c.connId, Op: op, Err: e}
}
// ---- exchange of messages ----
// msgPack allocates pktBuf and encodes msg into it.
func msgPack(connId uint32, msg proto.Msg) *pktBuf {
l := msg.NEOMsgEncodedLen()
buf := pktAlloc(proto.PktHeaderLen + l)
h := buf.Header()
h.ConnId = packed.Hton32(connId)
h.MsgCode = packed.Hton16(msg.NEOMsgCode())
h.MsgLen = packed.Hton32(uint32(l)) // XXX casting: think again
msg.NEOMsgEncode(buf.Payload())
return buf
}
// TODO msgUnpack
// sendMsg sends message with specified connection ID.
//
// it encodes message into packet, sets header appropriately and sends it.
//
// it is ok to call sendMsg in parallel with serveSend.
func (link *NodeLink) sendMsg(connId uint32, msg proto.Msg) error {
buf := msgPack(connId, msg)
return link.sendPkt(buf) // XXX more context in err? (msg type)
// FIXME ^^^ shutdown whole link on error
}
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
import (
"bytes"
"context"
"io"
"net"
"runtime"
"testing"
"time"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/internal/packed"
"lab.nexedi.com/kirr/neo/go/neo/proto"
"github.com/kylelemons/godebug/pretty"
)
func xclose(c io.Closer) {
err := c.Close()
exc.Raiseif(err)
}
func xnewconn(nl *NodeLink) *Conn {
c, err := nl.NewConn()
exc.Raiseif(err)
return c
}
func xaccept(nl *NodeLink) *Conn {
c, err := nl.Accept()
exc.Raiseif(err)
return c
}
func xsendPkt(c interface{ sendPkt(*pktBuf) error }, pkt *pktBuf) {
err := c.sendPkt(pkt)
exc.Raiseif(err)
}
func xrecvPkt(c interface{ recvPkt() (*pktBuf, error) }) *pktBuf {
pkt, err := c.recvPkt()
exc.Raiseif(err)
return pkt
}
func xwait(w interface{ Wait() error }) {
err := w.Wait()
exc.Raiseif(err)
}
func gox(wg interface{ Go(func() error) }, xf func()) {
wg.Go(exc.Funcx(xf))
}
// xlinkError verifies that err is *LinkError and returns err.Err .
func xlinkError(err error) error {
le, ok := err.(*LinkError)
if !ok {
exc.Raisef("%#v is not *LinkError", err)
}
return le.Err
}
// xconnError verifies that err is *ConnError and returns err.Err .
func xconnError(err error) error {
ce, ok := err.(*ConnError)
if !ok {
exc.Raisef("%#v is not *ConnError", err)
}
return ce.Err
}
// Prepare pktBuf with content.
func _mkpkt(connid uint32, msgcode uint16, payload []byte) *pktBuf {
pkt := &pktBuf{make([]byte, proto.PktHeaderLen+len(payload))}
h := pkt.Header()
h.ConnId = packed.Hton32(connid)
h.MsgCode = packed.Hton16(msgcode)
h.MsgLen = packed.Hton32(uint32(len(payload)))
copy(pkt.Payload(), payload)
return pkt
}
func (c *Conn) mkpkt(msgcode uint16, payload []byte) *pktBuf {
// in Conn exchange connid is automatically set by Conn.sendPkt
return _mkpkt(c.connId, msgcode, payload)
}
// Verify pktBuf is as expected.
func xverifyPkt(pkt *pktBuf, connid uint32, msgcode uint16, payload []byte) {
errv := xerr.Errorv{}
h := pkt.Header()
// TODO include caller location
if packed.Ntoh32(h.ConnId) != connid {
errv.Appendf("header: unexpected connid %v (want %v)", packed.Ntoh32(h.ConnId), connid)
}
if packed.Ntoh16(h.MsgCode) != msgcode {
errv.Appendf("header: unexpected msgcode %v (want %v)", packed.Ntoh16(h.MsgCode), msgcode)
}
if packed.Ntoh32(h.MsgLen) != uint32(len(payload)) {
errv.Appendf("header: unexpected msglen %v (want %v)", packed.Ntoh32(h.MsgLen), len(payload))
}
if !bytes.Equal(pkt.Payload(), payload) {
errv.Appendf("payload differ:\n%s",
pretty.Compare(string(payload), string(pkt.Payload())))
}
exc.Raiseif(errv.Err())
}
// Verify pktBuf to match expected message.
func xverifyPktMsg(pkt *pktBuf, connid uint32, msg proto.Msg) {
data := make([]byte, msg.NEOMsgEncodedLen())
msg.NEOMsgEncode(data)
xverifyPkt(pkt, connid, msg.NEOMsgCode(), data)
}
// delay a bit.
//
// needed e.g. to test Close interaction with waiting read or write
// (we cannot easily sync and make sure e.g. read is started and became asleep)
//
// XXX JM suggested to really wait till syscall starts this way:
// - via polling get traceback for thread that is going to call syscall and eventually block
// - if from that traceback we can see that blocking syscall is already called
// -> this way we can know that it is already blocking and thus sleep-hack can be avoided
// this can be done via runtime/pprof -> "goroutine" predefined profile
func tdelay() {
time.Sleep(1 * time.Millisecond)
}
// create NodeLinks connected via net.Pipe
func _nodeLinkPipe(flags1, flags2 _LinkRole) (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe()
nl1 = newNodeLink(node1, _LinkClient|flags1)
nl2 = newNodeLink(node2, _LinkServer|flags2)
return nl1, nl2
}
func nodeLinkPipe() (nl1, nl2 *NodeLink) {
return _nodeLinkPipe(0, 0)
}
func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
// Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl1)
})
pkt, err := nl1.recvPkt()
if !(pkt == nil && err == io.ErrClosedPipe) {
t.Fatalf("NodeLink.recvPkt() after close: pkt = %v err = %v", pkt, err)
}
xwait(wg)
xclose(nl2)
// Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl1)
})
pkt = &pktBuf{[]byte("data")}
err = nl1.sendPkt(pkt)
if err != io.ErrClosedPipe {
t.Fatalf("NodeLink.sendPkt() after close: err = %v", err)
}
xwait(wg)
xclose(nl2)
// {Close,CloseAccept} vs Accept
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl2)
})
c, err := nl2.Accept()
if !(c == nil && xlinkError(err) == ErrLinkClosed) {
t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err)
}
gox(wg, func() {
tdelay()
nl1.CloseAccept()
})
c, err = nl1.Accept()
if !(c == nil && xlinkError(err) == ErrLinkNoListen) {
t.Fatalf("NodeLink.Accept() after CloseAccept: conn = %v, err = %v", c, err)
}
xwait(wg)
// nl1 is now not accepting connections - because it was CloseAccept'ed
// check further Accept behaviour.
c, err = nl1.Accept()
if !(c == nil && xlinkError(err) == ErrLinkNoListen) {
t.Fatalf("NodeLink.Accept() on non-listening node link: conn = %v, err = %v", c, err)
}
xclose(nl1)
// Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl2)
})
pkt, err = nl1.recvPkt()
if !(pkt == nil && err == io.EOF) { // NOTE io.EOF on Read per io.Pipe
t.Fatalf("NodeLink.recvPkt() after peer shutdown: pkt = %v err = %v", pkt, err)
}
xwait(wg)
xclose(nl1)
// Close vs sendPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(nl2)
})
pkt = &pktBuf{[]byte("data")}
err = nl1.sendPkt(pkt)
if err != io.ErrClosedPipe { // NOTE io.ErrClosedPipe on Write per io.Pipe
t.Fatalf("NodeLink.sendPkt() after peer shutdown: pkt = %v err = %v", pkt, err)
}
xwait(wg)
xclose(nl1)
// raw exchange
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg, ctx := errgroup.WithContext(context.Background())
gox(wg, func() {
// send ping; wait for pong
pkt := _mkpkt(1, 2, []byte("ping"))
xsendPkt(nl1, pkt)
pkt = xrecvPkt(nl1)
xverifyPkt(pkt, 3, 4, []byte("pong"))
})
gox(wg, func() {
// wait for ping; send pong
pkt = xrecvPkt(nl2)
xverifyPkt(pkt, 1, 2, []byte("ping"))
pkt = _mkpkt(3, 4, []byte("pong"))
xsendPkt(nl2, pkt)
})
// close nodelinks either when checks are done, or upon first error
wgclose := &errgroup.Group{}
gox(wgclose, func() {
<-ctx.Done()
xclose(nl1)
xclose(nl2)
})
xwait(wg)
xwait(wgclose)
// ---- connections on top of nodelink ----
// Close vs recvPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1)
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(c)
})
pkt, err = c.recvPkt()
if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.recvPkt() after close: pkt = %v err = %v", pkt, err)
}
xwait(wg)
xclose(nl1)
xclose(nl2)
// Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c = xnewconn(nl1)
wg = &errgroup.Group{}
gox(wg, func() {
tdelay()
xclose(c)
})
pkt = c.mkpkt(0, []byte("data"))
err = c.sendPkt(pkt)
if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt() after close: err = %v", err)
}
xwait(wg)
// NodeLink.Close vs Conn.sendPkt/recvPkt
c11 := xnewconn(nl1)
c12 := xnewconn(nl1)
wg = &errgroup.Group{}
gox(wg, func() {
pkt, err := c11.recvPkt()
if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
exc.Raisef("Conn.recvPkt() after NodeLink close: pkt = %v err = %v", pkt, err)
}
})
gox(wg, func() {
pkt := c12.mkpkt(0, []byte("data"))
err := c12.sendPkt(pkt)
if xconnError(err) != ErrLinkClosed {
exc.Raisef("Conn.sendPkt() after NodeLink close: err = %v", err)
}
})
tdelay()
xclose(nl1)
xwait(wg)
xclose(c11)
xclose(c12)
xclose(nl2)
// NodeLink.Close vs Conn.sendPkt/recvPkt and Accept on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, 0)
c21 := xnewconn(nl2)
c22 := xnewconn(nl2)
c23 := xnewconn(nl2)
wg = &errgroup.Group{}
var errRecv error
gox(wg, func() {
pkt, err := c21.recvPkt()
want1 := io.EOF // if recvPkt wakes up due to peer close
want2 := io.ErrClosedPipe // if recvPkt wakes up due to sendPkt wakes up first and closes nl1
cerr := xconnError(err)
if !(pkt == nil && (cerr == want1 || cerr == want2)) {
exc.Raisef("Conn.recvPkt after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
}
errRecv = cerr
})
gox(wg, func() {
pkt := c22.mkpkt(0, []byte("data"))
err := c22.sendPkt(pkt)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2
if xconnError(err) != want {
exc.Raisef("Conn.sendPkt after peer NodeLink shutdown: %v", err)
}
})
gox(wg, func() {
conn, err := nl2.Accept()
if !(conn == nil && xlinkError(err) == ErrLinkDown) {
exc.Raisef("Accept after peer NodeLink shutdown: conn = %v err = %v", conn, err)
}
})
tdelay()
xclose(nl1)
xwait(wg)
// XXX denoise vvv
// NewConn after NodeLink shutdown
c, err = nl2.NewConn()
if xlinkError(err) != ErrLinkDown {
t.Fatalf("NewConn after NodeLink shutdown: %v", err)
}
// Accept after NodeLink shutdown
c, err = nl2.Accept()
if xlinkError(err) != ErrLinkDown {
t.Fatalf("Accept after NodeLink shutdown: conn = %v err = %v", c, err)
}
// recvPkt/sendPkt on another Conn
pkt, err = c23.recvPkt()
if !(pkt == nil && xconnError(err) == errRecv) {
t.Fatalf("Conn.recvPkt 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
}
err = c23.sendPkt(c23.mkpkt(0, []byte("data")))
if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.sendPkt 2 after peer NodeLink shutdown: %v", err)
}
// recvPkt/sendPkt error on second call
pkt, err = c21.recvPkt()
if !(pkt == nil && xconnError(err) == ErrLinkDown) {
t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err)
}
err = c22.sendPkt(c22.mkpkt(0, []byte("data")))
if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err)
}
xclose(c23)
// recvPkt/sendPkt on closed Conn but not closed NodeLink
pkt, err = c23.recvPkt()
if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.recvPkt after close but only stopped NodeLink: pkt = %v err = %v", pkt, err)
}
err = c23.sendPkt(c23.mkpkt(0, []byte("data")))
if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt after close but only stopped NodeLink: %v", err)
}
xclose(nl2)
// recvPkt/sendPkt NewConn/Accept error after NodeLink close
pkt, err = c21.recvPkt()
if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err)
}
err = c22.sendPkt(c22.mkpkt(0, []byte("data")))
if xconnError(err) != ErrLinkClosed {
t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err)
}
c, err = nl2.NewConn()
if xlinkError(err) != ErrLinkClosed {
t.Fatalf("NewConn after NodeLink close: %v", err)
}
c, err = nl2.Accept()
if xlinkError(err) != ErrLinkClosed {
t.Fatalf("Accept after NodeLink close: %v", err)
}
xclose(c21)
xclose(c22)
// recvPkt/sendPkt error after Close & NodeLink shutdown
pkt, err = c21.recvPkt()
if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.recvPkt after close and NodeLink close: pkt = %v err = %v", pkt, err)
}
err = c22.sendPkt(c22.mkpkt(0, []byte("data")))
if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt after close and NodeLink close: %v", err)
}
saveKeepClosed := connKeepClosed
connKeepClosed = 10 * time.Millisecond
// Conn accept + exchange
nl1, nl2 = nodeLinkPipe()
nl1.CloseAccept()
wg = &errgroup.Group{}
closed := make(chan int)
gox(wg, func() {
c := xaccept(nl2)
pkt := xrecvPkt(c)
xverifyPkt(pkt, c.connId, 33, []byte("ping"))
// change pkt a bit and send it back
xsendPkt(c, c.mkpkt(34, []byte("pong")))
// one more time
pkt = xrecvPkt(c)
xverifyPkt(pkt, c.connId, 35, []byte("ping2"))
xsendPkt(c, c.mkpkt(36, []byte("pong2")))
xclose(c)
closed <- 1
// once again as ^^^ but finish only with CloseRecv
c2 := xaccept(nl2)
pkt = xrecvPkt(c2)
xverifyPkt(pkt, c2.connId, 41, []byte("ping5"))
xsendPkt(c2, c2.mkpkt(42, []byte("pong5")))
c2.CloseRecv()
closed <- 2
// "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, c.mkpkt(38, []byte("pong3")))
pkt = xrecvPkt(c)
xverifyPktMsg(pkt, c.connId, errConnRefused)
xsendPkt(c, c.mkpkt(40, []byte("pong4"))) // once again
pkt = xrecvPkt(c)
xverifyPktMsg(pkt, c.connId, errConnRefused)
xclose(c)
})
c1 := xnewconn(nl1)
xsendPkt(c1, c1.mkpkt(33, []byte("ping")))
pkt = xrecvPkt(c1)
xverifyPkt(pkt, c1.connId, 34, []byte("pong"))
xsendPkt(c1, c1.mkpkt(35, []byte("ping2")))
pkt = xrecvPkt(c1)
xverifyPkt(pkt, c1.connId, 36, []byte("pong2"))
// "connection closed" after peer closed its end
<-closed
xsendPkt(c1, c1.mkpkt(37, []byte("ping3")))
pkt = xrecvPkt(c1)
xverifyPktMsg(pkt, c1.connId, errConnClosed)
xsendPkt(c1, c1.mkpkt(39, []byte("ping4"))) // once again
pkt = xrecvPkt(c1)
xverifyPktMsg(pkt, c1.connId, errConnClosed)
// XXX also should get EOF on recv
// one more time but now peer does only .CloseRecv()
c2 := xnewconn(nl1)
xsendPkt(c2, c2.mkpkt(41, []byte("ping5")))
pkt = xrecvPkt(c2)
xverifyPkt(pkt, c2.connId, 42, []byte("pong5"))
<-closed
xsendPkt(c2, c2.mkpkt(41, []byte("ping6")))
pkt = xrecvPkt(c2)
xverifyPktMsg(pkt, c2.connId, errConnClosed)
xwait(wg)
// make sure entry for closed nl2.1 stays in nl2.connTab
nl2.connMu.Lock()
if cnl2 := nl2.connTab[1]; cnl2 == nil {
t.Fatal("nl2.connTab[1] == nil ; want \"closed\" entry")
}
nl2.connMu.Unlock()
// make sure "closed" entry goes away after its time
time.Sleep(3*connKeepClosed)
nl2.connMu.Lock()
if cnl2 := nl2.connTab[1]; cnl2 != nil {
t.Fatalf("nl2.connTab[1] == %v after close time window ; want nil", cnl2)
}
nl2.connMu.Unlock()
xclose(c1)
xclose(c2)
xclose(nl1)
xclose(nl2)
connKeepClosed = saveKeepClosed
// test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe()
wg = &errgroup.Group{}
replyOrder := map[uint16]struct { // "order" in which to process requests
start chan struct{} // processing starts when start chan is ready
next uint16 // after processing this switch to next
}{
2: {make(chan struct{}), 1},
1: {make(chan struct{}), 0},
}
close(replyOrder[2].start)
gox(wg, func() {
for _ = range replyOrder {
c := xaccept(nl2)
gox(wg, func() {
pkt := xrecvPkt(c)
n := packed.Ntoh16(pkt.Header().MsgCode)
x := replyOrder[n]
// wait before it is our turn & echo pkt back
<-x.start
xsendPkt(c, pkt)
xclose(c)
// tell next it can start
if x.next != 0 {
close(replyOrder[x.next].start)
}
})
}
})
c1 = xnewconn(nl1)
c2 = xnewconn(nl1)
xsendPkt(c1, c1.mkpkt(1, []byte("")))
xsendPkt(c2, c2.mkpkt(2, []byte("")))
// replies must be coming in reverse order
xechoWait := func(c *Conn, msgCode uint16) {
pkt := xrecvPkt(c)
xverifyPkt(pkt, c.connId, msgCode, []byte(""))
}
xechoWait(c2, 2)
xechoWait(c1, 1)
xwait(wg)
xclose(c1)
xclose(c2)
xclose(nl1)
xclose(nl2)
}
// ---- benchmarks ----
// rtt over chan - for comparison as base.
func benchmarkChanRTT(b *testing.B, c12, c21 chan byte) {
go func() {
for {
c, ok := <-c12
if !ok {
break
}
c21 <- c
}
}()
for i := 0; i < b.N; i++ {
c := byte(i)
c12 <- c
cc := <-c21
if cc != c {
b.Fatalf("sent %q != got %q", c, cc)
}
}
close(c12)
}
func BenchmarkSyncChanRTT(b *testing.B) {
benchmarkChanRTT(b, make(chan byte), make(chan byte))
}
func BenchmarkBufChanRTT(b *testing.B) {
benchmarkChanRTT(b, make(chan byte, 1), make(chan byte, 1))
}
// rtt over (acceptq, rxq) & ack channels - base comparison for link.Accept + conn.Recv .
func BenchmarkBufChanAXRXRTT(b *testing.B) {
axq := make(chan chan byte)
ack := make(chan byte)
go func() {
for {
// accept
rxq, ok := <-axq
if !ok {
break
}
// recv
c := <-rxq
// send back
ack <- c
}
}()
rxq := make(chan byte, 1) // buffered
for i := 0; i < b.N; i++ {
c := byte(i)
axq <- rxq
rxq <- c
cc := <-ack
if cc != c {
b.Fatalf("sent %q != got %q", c, cc)
}
}
close(axq)
}
var gosched = make(chan struct{})
// GoschedLocal is like runtime.Gosched but queues current goroutine on P-local
// runqueue instead of global runqueue.
// FIXME does not work - in the end goroutines appear on different Ps/Ms
func GoschedLocal() {
go func() {
gosched <- struct{}{}
}()
<-gosched
}
// rtt over net.Conn Read/Write
// if serveRecv=t - do RX path with additional serveRecv-style goroutine
func benchmarkNetConnRTT(b *testing.B, c1, c2 net.Conn, serveRecv bool, ghandoff bool) {
buf1 := make([]byte, 1)
buf2 := make([]byte, 1)
// make func to recv from c into buf via selected rx strategy
mkrecv := func(c net.Conn, buf []byte) func() (int, error) {
var recv func() (int, error)
if serveRecv {
type rx struct {
n int
erx error
}
rxq := make(chan rx, 1)
rxghandoff := make(chan struct{})
var serveRx func()
serveRx = func() {
for {
n, erx := io.ReadFull(c, buf)
//fmt.Printf("(go) %p rx -> %v %v\n", c, n, erx)
rxq <- rx{n, erx}
// good: reduce switch to receiver G latency
// see comment about rxghandoff in serveRecv
// in case of TCP/loopback saves ~5μs
if ghandoff {
<-rxghandoff
}
// stop on first error
if erx != nil {
return
}
if false {
// bad - puts G in global runq and so it changes M
runtime.Gosched()
}
if false {
// bad - same as runtime.Gosched
GoschedLocal()
}
if false {
// bad - in the end Gs appear on different Ms
go serveRx()
return
}
}
}
go serveRx()
recv = func() (int, error) {
r := <-rxq
if ghandoff {
rxghandoff <- struct{}{}
}
return r.n, r.erx
}
} else {
recv = func() (int, error) {
return io.ReadFull(c, buf)
}
}
return recv
}
recv1 := mkrecv(c1, buf1)
recv2 := mkrecv(c2, buf2)
b.ResetTimer()
go func() {
defer func() {
//fmt.Printf("2: close\n")
xclose(c2)
}()
for {
n, erx := recv2()
//fmt.Printf("2: rx %q\n", buf2[:n])
if n > 0 {
if n != len(buf2) {
b.Fatalf("read -> %d bytes ; want %d", n, len(buf2))
}
//fmt.Printf("2: tx %q\n", buf2)
_, etx := c2.Write(buf2)
if etx != nil {
b.Fatal(etx)
}
}
switch erx {
case nil:
// ok
case io.ErrClosedPipe, io.EOF: // net.Pipe, TCP
return
default:
b.Fatal(erx) // XXX cannot call b.Fatal from non-main goroutine?
}
}
}()
for i := 0; i < b.N; i++ {
c := byte(i)
buf1[0] = c
//fmt.Printf("1: tx %q\n", buf1)
_, err := c1.Write(buf1)
if err != nil {
b.Fatal(err)
}
n, err := recv1()
//fmt.Printf("1: rx %q\n", buf1[:n])
if !(n == len(buf1) && err == nil) {
b.Fatalf("read back: n=%v err=%v", n, err)
}
if buf1[0] != c {
b.Fatalf("sent %q != got %q", c, buf1[0])
}
}
//fmt.Printf("1: close\n")
xclose(c1)
}
// rtt over net.Pipe - for comparison as base.
func BenchmarkNetPipeRTT(b *testing.B) {
c1, c2 := net.Pipe()
benchmarkNetConnRTT(b, c1, c2, false, false)
}
func BenchmarkNetPipeRTTsr(b *testing.B) {
c1, c2 := net.Pipe()
benchmarkNetConnRTT(b, c1, c2, true, false)
}
func BenchmarkNetPipeRTTsrho(b *testing.B) {
c1, c2 := net.Pipe()
benchmarkNetConnRTT(b, c1, c2, true, true)
}
// xtcpPipe creates two TCP connections connected to each other via loopback.
func xtcpPipe() (*net.TCPConn, *net.TCPConn) {
// NOTE go sets TCP_NODELAY by default for TCP sockets
l, err := net.Listen("tcp", "localhost:")
exc.Raiseif(err)
c1, err := net.Dial("tcp", l.Addr().String())
exc.Raiseif(err)
c2, err := l.Accept()
exc.Raiseif(err)
xclose(l)
return c1.(*net.TCPConn), c2.(*net.TCPConn)
}
// rtt over TCP/loopback - for comparison as base.
func BenchmarkTCPlo(b *testing.B) {
c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, false, false)
}
func BenchmarkTCPlosr(b *testing.B) {
c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, true, false)
}
func BenchmarkTCPlosrho(b *testing.B) {
c1, c2 := xtcpPipe()
benchmarkNetConnRTT(b, c1, c2, true, true)
}
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
// syntax sugar for atomic load/store to raise signal/noise in logic
import "sync/atomic"
type atomic32 struct {
v int32 // struct member so `var a atomic32; if a == 0 ...` does not work
}
func (a *atomic32) Get() int32 {
return atomic.LoadInt32(&a.v)
}
func (a *atomic32) Set(v int32) {
atomic.StoreInt32(&a.v, v)
}
func (a *atomic32) Add(δ int32) int32 {
return atomic.AddInt32(&a.v, δ)
}
// Copyright (C) 2016-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package neonet
// packets and packet buffers management
import (
"fmt"
"reflect"
"sync"
"unsafe"
"lab.nexedi.com/kirr/go123/xbytes"
"lab.nexedi.com/kirr/neo/go/internal/packed"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// pktBuf is a buffer with full raw packet (header + payload).
//
// Allocate pktBuf via pktAlloc() and free via pktBuf.Free().
type pktBuf struct {
data []byte // whole packet data including all headers
}
// Header returns pointer to packet header.
func (pkt *pktBuf) Header() *proto.PktHeader {
// NOTE no need to check len(.data) < PktHeader:
// .data is always allocated with cap >= PktHeaderLen.
return (*proto.PktHeader)(unsafe.Pointer(&pkt.data[0]))
}
// Payload returns []byte representing packet payload.
func (pkt *pktBuf) Payload() []byte {
return pkt.data[proto.PktHeaderLen:]
}
// ---- pktBuf freelist ----
// pktBufPool is sync.Pool<pktBuf>.
var pktBufPool = sync.Pool{New: func() interface{} {
return &pktBuf{data: make([]byte, 0, 4096)}
}}
// pktAlloc allocates pktBuf with len=n.
//
// n must be >= sizeof(proto.PktHeader).
func pktAlloc(n int) *pktBuf {
if n < proto.PktHeaderLen {
panic("pktAlloc: n < sizeof(PktHeader)")
}
pkt := pktBufPool.Get().(*pktBuf)
pkt.data = xbytes.Realloc(pkt.data, n)
return pkt
}
// Free marks pkt as no longer needed.
func (pkt *pktBuf) Free() {
pktBufPool.Put(pkt)
}
// ---- pktBuf dump ----
// String dumps a packet in human-readable form.
func (pkt *pktBuf) String() string {
if len(pkt.data) < proto.PktHeaderLen {
return fmt.Sprintf("(! < PktHeaderLen) % x", pkt.data)
}
h := pkt.Header()
s := fmt.Sprintf(".%d", packed.Ntoh32(h.ConnId))
msgCode := packed.Ntoh16(h.MsgCode)
msgLen := packed.Ntoh32(h.MsgLen)
data := pkt.Payload()
msgType := proto.MsgType(msgCode)
if msgType == nil {
s += fmt.Sprintf(" ? (%d) #%d [%d]: % x", msgCode, msgLen, len(data), data)
return s
}
// XXX dup wrt Conn.Recv
msg := reflect.New(msgType).Interface().(proto.Msg)
n, err := msg.NEOMsgDecode(data)
if err != nil {
s += fmt.Sprintf(" (%s) %v; #%d [%d]: % x", msgType, err, msgLen, len(data), data)
}
s += fmt.Sprintf(" %s %v", msgType.Name(), msg) // XXX or %+v better?
if n < len(data) {
tail := data[n:]
s += fmt.Sprintf(" ; [%d]tail: % x", len(tail), tail)
}
return s
}
// Dump dumps a packet in raw form.
func (pkt *pktBuf) Dump() string {
if len(pkt.data) < proto.PktHeaderLen {
return fmt.Sprintf("(! < pktHeaderLen) % x", pkt.data)
}
h := pkt.Header()
data := pkt.Payload()
return fmt.Sprintf(".%d (%d) #%d [%d]: % x",
packed.Ntoh32(h.ConnId), packed.Ntoh16(h.MsgCode), packed.Ntoh32(h.MsgLen), len(data), data)
}
......@@ -31,7 +31,8 @@
// A message type can be looked up by message code with MsgType.
//
// The proto packages provides only message definitions and low-level
// primitives for their marshalling.
// primitives for their marshalling. Package lab.nexedi.com/kirr/neo/go/neo/neonet
// provides actual service for message exchange over network.
package proto
// This file defines everything that relates to messages on the wire.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment