// Copyright (C) 2016-2018  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

// Package neonet provides service to establish links and exchange messages in
// a NEO network.
//
// A NEO node - node link can be established with DialLink and ListenLink
// similarly to how it is done in standard package net. Once established, a
// link (NodeLink) provides service for multiplexing several communication
// connections on top of it. Connections (Conn) in turn provide service to
// exchange NEO protocol messages.
//
// New connections can be created with link.NewConn(). Once connection is
// created and a message is sent over it, on peer's side another corresponding
// new connection can be accepted via link.Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
//
// Use conn.Send and conn.Recv to actually exchange messages. See Conn
// documentation for other message-exchange utilities like Ask and Expect.
//
// See also package lab.nexedi.com/kirr/neo/go/neo/proto for definition of NEO
// messages.
//
//
// Lightweight mode
//
// In situations when created connections are used to only send/receive 1
// request/response, the overhead to create/shutdown full connections could be
// too much. Unfortunately this is exactly the mode that is currently
// primarily used for compatibility with NEO/py. To help mitigate the overhead
// in such scenarios, lightweight connections mode is provided:
//
// At requester side, one message can be sent over node link with link.Send1 .
// Inside a connection will be created and then shut down, but since the
// code manages whole process internally and does not show the connection to
// user, it can optimize those operations significantly. Similarly link.Ask1
// sends 1 request, receives 1 response, and then puts the connection back into
// pool for later reuse.
//
// At receiver side, link.Recv1 accepts a connection with the first message
// remote peer sent us when establishing it, and wraps the result into Request
// object. The Request contains the message received and internally the
// connection. A response can be sent back via Request.Reply. Then once
// Request.Close is called the connection object that was accepted is
// immediately put back into pool for later reuse.
package neonet

// XXX neonet compatibility with NEO/py depends on the following small NEO/py patch:
//
//	https://lab.nexedi.com/kirr/neo/commit/dd3bb8b4
//
// which adjusts message ID a bit so it behaves like stream_id in HTTP/2:
//
//	- always even for server initiated streams
//	- always odd  for client initiated streams
//
// and is incremented by += 2, instead of += 1 to maintain above invariant.
//
// See http://navytux.spb.ru/~kirr/neo.html#development-overview (starting from
// "Then comes the link layer which provides service to exchange messages over
// network...") for the rationale.
//
// Unfortunately current NEO/py maintainer is very much against merging that patch.


//go:generate gotrace gen .

import (
	"errors"
	"fmt"
	"io"
	"math"
	"net"
	"reflect"
	//"runtime"
	"sync"
	"time"

	"lab.nexedi.com/kirr/neo/go/internal/packed"
	"lab.nexedi.com/kirr/neo/go/neo/proto"

	"github.com/someonegg/gocontainer/rbuf"

	"lab.nexedi.com/kirr/go123/xbytes"
)

// NodeLink is a node-node link in NEO.
//
// A node-node link represents bidirectional symmetrical communication
// channel in between 2 NEO nodes. The link provides service for multiplexing
// several communication connections on top of the node-node link.
//
// New connection can be created with .NewConn() . Once connection is
// created and data is sent over it, on peer's side another corresponding
// new connection can be accepted via .Accept(), and all further communication
// send/receive exchange will be happening in between those 2 connections.
//
// A NodeLink has to be explicitly closed, once it is no longer needed.
//
// It is safe to use NodeLink from multiple goroutines simultaneously.
type NodeLink struct {
	peerLink net.Conn // raw conn to peer

	connMu     sync.Mutex
	connTab    map[uint32]*Conn // connId -> Conn associated with connId
	nextConnId uint32           // next connId to use for Conn initiated by us

	serveWg sync.WaitGroup	// for serve{Send,Recv}
	txq	chan txReq	// tx requests from Conns go via here
				// (rx packets are routed to Conn.rxq)

	acceptq  chan *Conn	// queue of incoming connections for Accept
	axqWrite atomic32	//  1 while serveRecv is doing `acceptq <- ...`
	axqRead  atomic32	// +1 while Accept is doing `... <- acceptq`
	axdownFlag atomic32	//  1 when AX is marked no longer operational

//	axdown  chan struct{}	// ready when accept is marked as no longer operational
	axdown1 sync.Once	// CloseAccept may be called several times

	down     chan struct{}  // ready when NodeLink is marked as no longer operational
	downOnce sync.Once      // shutdown may be due to both Close and IO error
	downWg   sync.WaitGroup // for activities at shutdown
	errClose error          // error got from peerLink.Close

	errMu    sync.Mutex
	errRecv	 error		// error got from recvPkt on shutdown

	axclosed atomic32	// whether CloseAccept was called
	closed   atomic32	// whether Close was called

	rxbuf    rbuf.RingBuf	// buffer for reading from peerLink

	// scheduling optimization: whenever serveRecv sends to Conn.rxq
	// receiving side must ack here to receive G handoff.
	// See comments in serveRecv for details.
	rxghandoff chan struct{}
}

// XXX rx handoff make latency better for serial request-reply scenario but
// does a lot of harm for case when there are several parallel requests -
// serveRecv after handing off is put to tail of current cpu runqueue - not
// receiving next requests and not spawning handlers for them, thus essential
// creating Head-of-line (HOL) blocking problem.
//
// XXX ^^^ problem reproducible on deco but not on z6001
const rxghandoff = true // XXX whether to do rxghandoff trick

// Conn is a connection established over NodeLink.
//
// Messages can be sent and received over it.
// Once connection is no longer needed it has to be closed.
//
// It is safe to use Conn from multiple goroutines simultaneously.
type Conn struct {
	link      *NodeLink
	connId    uint32

	rxq	   chan *pktBuf	 // received packets for this Conn go here
	rxqWrite   atomic32	 //  1 while serveRecv is doing `rxq <- ...`
	rxqRead    atomic32      // +1 while Conn.Recv is doing `... <- rxq`
	rxdownFlag atomic32	 //  1 when RX is marked no longer operational
	// XXX ^^^ split to different cache lines?

	rxerrOnce sync.Once     // rx error is reported only once - then it is link down or closed XXX !light?

	// there are two modes a Conn could be used:
	// - full mode - where full Conn functionality is working, and
	// - light mode - where only subset functionality is working
	//
	// the light mode is used to implement Recv1 & friends - there any
	// connection is used max to send and/or receive only 1 packet and then
	// has to be reused for efficiency ideally without reallocating anything.
	//
	// everything below is used during !light mode only.

//	rxdown     chan struct{} // ready when RX is marked no longer operational
	rxdownOnce sync.Once	 // ----//----	XXX review
	rxclosed   atomic32	 // whether CloseRecv was called

	txerr     chan error	 // transmit results for this Conn go back here

	txdown     chan struct{} // ready when Conn TX is marked as no longer operational
	txdownOnce sync.Once	 // tx shutdown may be called by both Close and nodelink.shutdown
	txclosed   atomic32	 // whether CloseSend was called

	// closing Conn is shutdown + some cleanup work to remove it from
	// link.connTab including arming timers etc. Let this work be spawned only once.
	// (for Conn.Close to be valid called several times)
	closeOnce sync.Once
}

var ErrLinkClosed   = errors.New("node link is closed")	// operations on closed NodeLink
var ErrLinkDown     = errors.New("node link is down")	// e.g. due to IO error
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrLinkManyConn = errors.New("too many opened connections")
var ErrClosedConn   = errors.New("connection is closed")

// LinkError is returned by NodeLink operations.
type LinkError struct {
	Link *NodeLink
	Op   string
	Err  error
}

// ConnError is returned by Conn operations.
type ConnError struct {
	Link   *NodeLink
	ConnId uint32 // NOTE Conn's are reused - cannot use *Conn here
	Op     string
	Err    error
}

// _LinkRole is a role an end of NodeLink is intended to play
//
// XXX _LinkRole will need to become public again if _Handshake does.
type _LinkRole int
const (
	_LinkServer _LinkRole = iota // link created as server
	_LinkClient                  // link created as client

	// for testing:
	linkNoRecvSend _LinkRole = 1 << 16 // do not spawn serveRecv & serveSend
	linkFlagsMask  _LinkRole = (1<<32 - 1) << 16
)

// newNodeLink makes a new NodeLink from already established net.Conn .
//
// Role specifies how to treat our role on the link - either as client or
// server. The difference in between client and server roles is in:
//
//    how connection ids are allocated for connections initiated at our side:
//    there is no conflict in identifiers if one side always allocates them as
//    even (server) and its peer as odd (client).
//
// Usually server role should be used for connections created via
// net.Listen/net.Accept and client role for connections created via net.Dial.
//
// Though it is possible to wrap just-established raw connection into NodeLink,
// users should always use Handshake which performs protocol handshaking first.
func newNodeLink(conn net.Conn, role _LinkRole) *NodeLink {
	var nextConnId uint32
	switch role &^ linkFlagsMask {
	case _LinkServer:
		nextConnId = 0 // all initiated by us connId will be even
	case _LinkClient:
		nextConnId = 1 // ----//---- odd
	default:
		panic("invalid conn role")
	}

	nl := &NodeLink{
		peerLink:   conn,
		connTab:    map[uint32]*Conn{},
		nextConnId: nextConnId,
		acceptq:    make(chan *Conn),	// XXX +buf ?
		txq:        make(chan txReq),
		rxghandoff: make(chan struct{}),
//		axdown:     make(chan struct{}),
		down:       make(chan struct{}),
	}
	if role&linkNoRecvSend == 0 {
		nl.serveWg.Add(2)
		go nl.serveRecv()
		go nl.serveSend()
	}
	return nl
}

// connPool is freelist for Conn.
// XXX make it per-link?
var connPool = sync.Pool{New: func() interface{} {
	return &Conn{
		rxq:    make(chan *pktBuf, 1),	// NOTE non-blocking - see serveRecv XXX +buf ?
		txerr:  make(chan error, 1),	// NOTE non-blocking - see Conn.Send
		txdown: make(chan struct{}),
//		rxdown: make(chan struct{}),
	}
}}

// connAlloc allocates Conn from freelist.
func (link *NodeLink) connAlloc(connId uint32) *Conn {
	c := connPool.Get().(*Conn)
	c.reinit()
	c.link = link
	c.connId = connId
	return c
}

// release releases connection to freelist.
func (c *Conn) release() {
	c.reinit() // XXX just in case
	connPool.Put(c)
}

// reinit reinitializes connection after reallocating it from freelist.
func (c *Conn) reinit() {
	c.link = nil
	c.connId = 0
	// .rxq		- set initially; does not change
	c.rxqWrite.Set(0)   // XXX store relaxed?
	c.rxqRead.Set(0)    // ----//----
	c.rxdownFlag.Set(0) // ----//----

	c.rxerrOnce = sync.Once{} // XXX ok?

	// XXX vvv not strictly needed for light mode?
//	ensureOpen(&c.rxdown)
	c.rxdownOnce = sync.Once{} // XXX ok?
	c.rxclosed.Set(0)

	// .txerr	- never closed

	ensureOpen(&c.txdown)
	c.txdownOnce = sync.Once{} // XXX ok?
	c.txclosed.Set(0)

	c.closeOnce = sync.Once{} // XXX ok?
}

// ensureOpen make sure *ch stays non-closed chan struct{} for signalling.
// if it is already closed, the channel is remade.
func ensureOpen(ch *chan struct{}) {
	select {
	case <-*ch:
		*ch = make(chan struct{})
	default:
		// not closed - nothing to do
	}
}

// newConn creates new Conn with id=connId and registers it into connTab.
// must be called with connMu held.
func (link *NodeLink) newConn(connId uint32) *Conn {
	c := link.connAlloc(connId)
	link.connTab[connId] = c
	return c
}

// NewConn creates new connection on top of node-node link.
func (link *NodeLink) NewConn() (*Conn, error) {
	link.connMu.Lock()
	//defer link.connMu.Unlock()
	c, err := link._NewConn()
	link.connMu.Unlock()
	return c, err
}

func (link *NodeLink) _NewConn() (*Conn, error) {
	if link.connTab == nil {
		if link.closed.Get() != 0 {
			return nil, link.err("newconn", ErrLinkClosed)
		}
		return nil, link.err("newconn", ErrLinkDown)
	}

	// nextConnId could wrap around uint32 limits - find first free slot to
	// not blindly replace existing connection
	for i := uint32(0); ; i++ {
		_, exists := link.connTab[link.nextConnId]
		if !exists {
			break
		}
		link.nextConnId += 2

		if i > math.MaxUint32 / 2 {
			return nil, link.err("newconn", ErrLinkManyConn)
		}
	}

	c := link.newConn(link.nextConnId)
	link.nextConnId += 2

	return c, nil
}

// shutdownAX marks acceptq as no longer operational and interrupts Accept.
func (link *NodeLink) shutdownAX() {
	link.axdown1.Do(func() {
//		close(link.axdown)

		link.axdownFlag.Set(1) // XXX cmpxchg and return if already down?

		// drain all connections from .acceptq:
		// - something could be already buffered there
		// - serveRecv could start writing acceptq at the same time we set axdownFlag; we derace it
		for {
			// if serveRecv is outside `.acceptq <- ...` critical
			// region and fully drained - we are done.
			// see description of the logic in shutdownRX
			if link.axqWrite.Get() == 0 && len(link.acceptq) == 0 {
				break
			}

			select {
			case conn := <-link.acceptq:
				// serveRecv already put at least 1 packet into conn.rxq before putting
				// conn into .acceptq - shutting it down will send the error to peer.
				conn.shutdownRX(errConnRefused)

				link.connMu.Lock()
				delete(link.connTab, conn.connId)
				link.connMu.Unlock()

			default:
				// ok - continue spinning
			}
		}

		// wakeup Accepts
		for {
			// similarly to above: .axdownFlag vs .axqRead
			// see logic description in shutdownRX
			if link.axqRead.Get() == 0 {
				break
			}

			select {
			case link.acceptq <- nil:
				// ok - woken up

			default:
				// ok - continue spinning
			}
		}
	})
}

// shutdown closes raw link to peer and marks NodeLink as no longer operational.
//
// it also shutdowns all opened connections over this node link.
func (nl *NodeLink) shutdown() {
	nl.shutdownAX()
	nl.downOnce.Do(func() {
		close(nl.down)

		// close actual link to peer. this will wakeup {send,recv}Pkt
		// NOTE we need it here so that e.g. aborting on error in serveSend wakes up serveRecv
		nl.errClose = nl.peerLink.Close()

		nl.downWg.Add(1)
		go func() {
			defer nl.downWg.Done()

			// wait for serve{Send,Recv} to complete before shutting connections down
			//
			// we have to do it so that e.g. serveSend has chance
			// to return last error from sendPkt to requester.
			nl.serveWg.Wait()

			// clear + mark down .connTab + shutdown all connections
			nl.connMu.Lock()
			connTab := nl.connTab
			nl.connTab = nil
			nl.connMu.Unlock()

			// conn.shutdown() outside of link.connMu lock
			for _, conn := range connTab {
				conn.shutdown()
			}
		}()
	})
}

// CloseAccept instructs node link to not accept incoming connections anymore.
//
// Any blocked Accept() will be unblocked and return error.
// The peer will receive "connection refused" if it tries to connect after and
// for already-queued connection requests.
//
// It is safe to call CloseAccept several times.
func (link *NodeLink) CloseAccept() {
	link.axclosed.Set(1)
	link.shutdownAX()
}

// Close closes node-node link.
//
// All blocking operations - Accept and IO on associated connections
// established over node link - are automatically interrupted with an error.
// Underlying raw connection is closed.
// It is safe to call Close several times.
func (link *NodeLink) Close() error {
	link.axclosed.Set(1)
	link.closed.Set(1)
	link.shutdown()
	link.downWg.Wait()
	return link.err("close", link.errClose)
}

// shutdown marks connection as no longer operational and interrupts Send and Recv.
func (c *Conn) shutdown() {
	c.shutdownTX()
	c.shutdownRX(errConnClosed)
}

// shutdownTX marks TX as no longer operational and interrupts Send.
func (c *Conn) shutdownTX() {
	c.txdownOnce.Do(func() {
		close(c.txdown)
	})
}

// shutdownRX marks .rxq as no longer operational and interrupts Recv.
func (c *Conn) shutdownRX(errMsg *proto.Error) {
	c.rxdownOnce.Do(func() {
//		close(c.rxdown)	// wakeup Conn.Recv
		c.downRX(errMsg)
	})
}

// downRX marks .rxq as no longer operational.
//
// used in shutdownRX and separately to mark RX down for light Conns.
func (c *Conn) downRX(errMsg *proto.Error) {
	// let serveRecv know RX is down for this connection
	c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down?

	// drain all packets from .rxq:
	// - something could be already buffered there
	// - serveRecv could start writing rxq at the same time we set rxdownFlag; we derace it.
	i := 0
	for {
		// we set .rxdownFlag=1 above.
		// now if serveRecv is outside `.rxq <- ...` critical section we know it is either:
		// - before it	-> it will eventually see .rxdownFlag=1 and won't send pkt to rxq.
		// - after it	-> it already sent pkt to rxq and won't touch
		//		   rxq until next packet (where it will hit "before it").
		//
		// when serveRecv stopped sending we know we are done draining when rxq is empty.
		if c.rxqWrite.Get() == 0 && len(c.rxq) == 0 {
			break
		}

		select {
		case <-c.rxq:
			c.rxack()
			i++

		default:
			// ok - continue spinning
		}
	}

	// if something was queued already there - reply "connection closed"
	if i != 0 {
		go c.link.replyNoConn(c.connId, errMsg)
	}

	// wakeup recvPkt(s)
	for {
		// similarly to above:
		// we set .rxdownFlag=1
		// now if recvPkt is outside `... <- .rxq` critical section we know that it is either:
		// - before it	-> it will eventually see .rxdownFlag=1 and won't try to read rxq.
		// - after it	-> it already read pktfrom rxq and won't touch
		//                 rxq until next recvPkt (where it will hit "before it").
		if c.rxqRead.Get() == 0 {
			break
		}

		select {
		case c.rxq <- nil:
			// ok - woken up

		default:
			// ok - continue spinning
		}
	}
}


// time to keep record of a closed connection so that we can properly reply
// "connection closed" if a packet comes in with same connID.
var connKeepClosed = 1 * time.Minute

// CloseRecv closes reading end of connection.
//
// Any blocked Recv*() will be unblocked and return error.
// The peer will receive "connection closed" if it tries to send anything after
// and for messages already in local rx queue.
//
// It is safe to call CloseRecv several times.
func (c *Conn) CloseRecv() {
	c.rxclosed.Set(1)
	c.shutdownRX(errConnClosed)
}

// Close closes connection.
//
// Any blocked Send*() or Recv*() will be unblocked and return error.
//
// NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break node-node link framing.
//
// It is safe to call Close several times.
func (c *Conn) Close() error {
	link := c.link
	c.closeOnce.Do(func() {
		c.rxclosed.Set(1)
		c.txclosed.Set(1)
		c.shutdown()

		// adjust link.connTab
		var tmpclosed *Conn
		link.connMu.Lock()
		if link.connTab != nil {
			// connection was initiated by us - simply delete - we always
			// know if a packet comes to such connection - it is closed.
			//
			// XXX checking vvv should be possible without connMu lock
			if c.connId == link.nextConnId % 2 {
				delete(link.connTab, c.connId)

			// connection was initiated by peer which we accepted - put special
			// "closed" connection into connTab entry for some time to reply
			// "connection closed" if another packet comes to it.
			//
			// ( we cannot reuse same connection since after it is marked as
			//   closed Send refuses to work )
			} else {
				// delete(link.connTab, c.connId)
				// XXX vvv was temp. disabled - costs a lot in 1req=1conn model

				// c implicitly goes away from connTab
				tmpclosed = link.newConn(c.connId)
			}
		}
		link.connMu.Unlock()

		if tmpclosed != nil {
			tmpclosed.shutdownRX(errConnClosed)

			time.AfterFunc(connKeepClosed, func() {
				link.connMu.Lock()
				delete(link.connTab, c.connId)
				link.connMu.Unlock()
			})
		}
	})

	return nil
}

// ---- receive ----

// errAcceptShutdownAX returns appropriate error when link.axdown is found ready in Accept.
func (link *NodeLink) errAcceptShutdownAX() error {
	switch {
	case link.closed.Get() != 0:
		return ErrLinkClosed

	case link.axclosed.Get() != 0:
		return ErrLinkNoListen

	default:
		// XXX do the same as in errRecvShutdown (check link.errRecv)
		return ErrLinkDown
	}
}

// Accept waits for and accepts incoming connection on top of node-node link.
func (link *NodeLink) Accept() (*Conn, error) {
	// semantically equivalent to the following:
	// ( this is hot path for py compatibility mode because new connection
	//   is established in every message and select hurts performance )
	//
	// select {
	// case <-link.axdown:
	// 	return nil, link.err("accept", link.errAcceptShutdownAX())
	//
	// case c := <-link.acceptq:
	// 	return c, nil
	// }

	var conn *Conn
	var err  error

	link.axqRead.Add(1)
	axdown := link.axdownFlag.Get() != 0
	if !axdown {
		conn = <-link.acceptq
	}
	link.axqRead.Add(-1)

	// in contrast to recvPkt we can decide about error after releasing axqRead
	// reason: link is not going to be released to a free pool.
	if axdown || conn == nil {
		err = link.err("accept", link.errAcceptShutdownAX())
	}

	return conn, err
}

// errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt.
func (c *Conn) errRecvShutdown() error {
	switch {
	case c.rxclosed.Get() != 0:
		return ErrClosedConn

	case c.link.closed.Get() != 0:
		return ErrLinkClosed

	default:
		// we have to check what was particular RX error on nodelink shutdown
		// only do that once - after reporting RX error the first time
		// tell client the node link is no longer operational.
		var err error
		c.rxerrOnce.Do(func() {
			c.link.errMu.Lock()
			err = c.link.errRecv
			c.link.errMu.Unlock()
		})
		if err == nil {
			err = ErrLinkDown
		}
		return err
	}
}

// recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*pktBuf, error) {
	// semantically equivalent to the following:
	// (this is hot path and select is not used for performance reason)
	//
	// select {
	// case <-c.rxdown:
	// 	return nil, c.err("recv", c.errRecvShutdown())
	//
	// case pkt := <-c.rxq:
	// 	return pkt, nil
	// }

	var pkt *pktBuf
	var err error

	c.rxqRead.Add(1)
	rxdown := c.rxdownFlag.Get() != 0
	if !rxdown {
		pkt = <-c.rxq
	}

	// decide about error while under rxqRead (if after - the Conn can go away to be released)
	if rxdown || pkt == nil {
		err = c.err("recv", c.errRecvShutdown())
	}

	c.rxqRead.Add(-1)
	if err == nil {
		c.rxack()
	}
	return pkt, err
}

// rxack unblocks serveRecv after it handed G to us.
// see comments about rxghandoff in serveRecv.
func (c *Conn) rxack() {
	if !rxghandoff {
		return
	}
	//fmt.Printf("conn: rxack <- ...\n")
	c.link.rxghandoff <- struct{}{}
	//fmt.Printf("\tconn: rxack <- ... ok\n")
}

// serveRecv handles incoming packets routing them to either appropriate
// already-established connection or, if node link is accepting incoming
// connections, to new connection put to accept queue.
func (nl *NodeLink) serveRecv() {
	defer nl.serveWg.Done()
	for {
		// receive 1 packet
		// NOTE if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
		pkt, err := nl.recvPkt()
		//fmt.Printf("\n%p recvPkt -> %v, %v\n", nl, pkt, err)
		if err != nil {
			// on IO error framing over peerLink becomes broken
			// so we shut down node link and all connections over it.

			nl.errMu.Lock()
			nl.errRecv = err
			nl.errMu.Unlock()

			nl.shutdown()
			return
		}

		// pkt.ConnId -> Conn
		connId := packed.Ntoh32(pkt.Header().ConnId)
		accept := false

		nl.connMu.Lock()

		// connTab is never nil here - because shutdown, before
		// resetting it, waits for us to finish.
		conn := nl.connTab[connId]

		if conn == nil {
			// message with connid for a stream initiated by peer
			// it will be considered to be accepted (not if .axdown)
			if connId % 2 != nl.nextConnId % 2 {
				accept = true
				conn = nl.newConn(connId)
			}

			// else it is message with connid that should be initiated by us
			// leave conn=nil - we'll reply errConnClosed
		}

		nl.connMu.Unlock()

		//fmt.Printf("%p\tconn: %v\n", nl, conn)
		if conn == nil {
			// see ^^^ "message with connid that should be initiated by us"
			go nl.replyNoConn(connId, errConnClosed)
			continue
		}

		// route packet to serving goroutine handler
		//
		// TODO backpressure when Recv is not keeping up with Send on peer side?
		//      (not to let whole link starve because of one connection)
		//
		// NOTE rxq must be buffered with at least 1 element so that
		// queuing pkt succeeds for incoming connection that is not yet
		// there in acceptq.
		conn.rxqWrite.Set(1)
		rxdown := conn.rxdownFlag.Get() != 0
		if !rxdown {
			conn.rxq <- pkt
		}
		conn.rxqWrite.Set(0)

		//fmt.Printf("%p\tconn.rxdown: %v\taccept: %v\n", nl, rxdown, accept)


		// conn exists, but rx is down - "connection closed"
		// (this cannot happen for newly accepted connection)
		if rxdown {
			go nl.replyNoConn(connId, errConnClosed)
			continue
		}

		// this packet established new connection - try to accept it
		if accept {
			nl.axqWrite.Set(1)
			axdown := nl.axdownFlag.Get() != 0
			if !axdown {
				nl.acceptq <- conn
			}
			nl.axqWrite.Set(0)

			// we are not accepting the connection
			if axdown {
				go nl.replyNoConn(connId, errConnRefused)
				nl.connMu.Lock()
				delete(nl.connTab, conn.connId)
				nl.connMu.Unlock()

				continue
			}
		}

		//fmt.Printf("%p\tafter accept\n", nl)

		// Normally serveRecv G will continue to run with G waking up
		// on rxq/acceptq only being put into the runqueue of current proc.
		// By default proc runq will execute only when sendRecv blocks
		// again next time deep in nl.recvPkt(), but let's force the switch
		// now without additional waiting to reduce latency.

		// XXX bad - puts serveRecv to global runq thus with high p to switch M
		//runtime.Gosched()

		// handoff execution to receiving goroutine via channel.
		// rest of serveRecv is put to current P local runq.
		//
		// details:
		// - https://github.com/golang/go/issues/20168
		// - https://github.com/golang/go/issues/15110
		//
		// see BenchmarkTCPlo* - for serveRecv style RX handoff there
		// cuts RTT from 12.5μs to 6.6μs (RTT without serveRecv style G is 4.8μs)
		//
		// TODO report upstream
		if rxghandoff {
			//fmt.Printf("serveRecv: <-rxghandoff\n")
			<-nl.rxghandoff
			//fmt.Printf("\tserveRecv: <-rxghandoff ok\n")
		}

/*
		// XXX goes away in favour of .rxdownFlag; reasons
		// - no need to reallocate rxdown for light conn
		// - no select
		//
		// XXX review synchronization via flags for correctness (e.g.
		// if both G were on the same runqueue, spinning in G1 will
		// prevent G2 progress)
		//
		// XXX maybe we'll need select if we add ctx into Send/Recv.

		// don't even try `conn.rxq <- ...` if conn.rxdown is ready
		// ( else since select is picking random ready variant Recv/serveRecv
		//   could receive something on rxdown Conn sometimes )
		rxdown := false
		select {
		case <-conn.rxdown:
			rxdown = true
		default:
			// ok
		}

		// route packet to serving goroutine handler
		//
		// TODO backpressure when Recv is not keeping up with Send on peer side?
		//      (not to let whole link starve because of one connection)
		//
		// NOTE rxq must be buffered with at least 1 element so that
		// queuing pkt succeeds for incoming connection that is not yet
		// there in acceptq.
		if !rxdown {
			// XXX can avoid select here: if conn closer cares to drain rxq (?)
			select {
			case <-conn.rxdown:
				rxdown = true

			case conn.rxq <- pkt:
				// ok
			}
		}

		...

		// this packet established new connection - try to accept it
		if accept {
			// don't even try `link.acceptq <- ...` if link.axdown is ready
			// ( else since select is picking random ready variant Accept/serveRecv
			//   could receive something on axdown Link sometimes )
			axdown := false
			select {
			case <-nl.axdown:
				axdown = true

			default:
				// ok
			}

			// put conn to .acceptq
			if !axdown {
				// XXX can avoid select here if shutdownAX cares to drain acceptq (?)
				select {
				case <-nl.axdown:
					axdown = true

				case nl.acceptq <- conn:
					//fmt.Printf("%p\t.acceptq <- conn  ok\n", nl)
					// ok
				}
			}

			// we are not accepting the connection
			if axdown {
				conn.shutdownRX(errConnRefused)
				nl.connMu.Lock()
				delete(nl.connTab, conn.connId)
				nl.connMu.Unlock()
			}
		}
*/
	}
}

// ---- network replies for closed / refused connections ----

var errConnClosed  = &proto.Error{proto.PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &proto.Error{proto.PROTOCOL_ERROR, "connection refused"}

// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
func (link *NodeLink) replyNoConn(connId uint32, errMsg proto.Msg) {
	//fmt.Printf("%s .%d: -> replyNoConn %v\n", link, connId, errMsg)
	link.sendMsg(connId, errMsg) // ignore errors
	//fmt.Printf("%s .%d: replyNoConn(%v) -> %v\n", link, connId, errMsg, err)
}

// ---- transmit ----

// txReq is request to transmit a packet. Result error goes back to errch.
type txReq struct {
	pkt   *pktBuf
	errch chan error
}

// errSendShutdown returns appropriate error when c.txdown is found ready in Send.
func (c *Conn) errSendShutdown() error {
	switch {
	case c.txclosed.Get() != 0:
		return ErrClosedConn

	// the only other error possible besides Conn being .Close()'ed is that
	// NodeLink was closed/shutdowned itself - on actual IO problems corresponding
	// error is delivered to particular Send that caused it.

	case c.link.closed.Get() != 0:
		return ErrLinkClosed

	default:
		return ErrLinkDown
	}
}

// sendPkt sends raw packet via connection.
//
// on success pkt is freed.
func (c *Conn) sendPkt(pkt *pktBuf) error {
	err := c.sendPkt2(pkt)
	return c.err("send", err)
}

func (c *Conn) sendPkt2(pkt *pktBuf) error {
	// connId must be set to one associated with this connection
	if pkt.Header().ConnId != packed.Hton32(c.connId) {
		panic("Conn.sendPkt: connId wrong")
	}

	var err error

	select {
	case <-c.txdown:
		return c.errSendShutdown()

	case c.link.txq <- txReq{pkt, c.txerr}:
		select {
		// tx request was sent to serveSend and is being transmitted on the wire.
		// the transmission may block for indefinitely long though and
		// we cannot interrupt it as the only way to interrupt is
		// .link.Close() which will close all other Conns.
		//
		// That's why we are also checking for c.txdown while waiting
		// for reply from serveSend (and leave pkt to finish transmitting).
		//
		// NOTE after we return straight here serveSend won't be later
		// blocked on c.txerr<- because that backchannel is a non-blocking one.
		case <-c.txdown:

			// also poll c.txerr here because: when there is TX error,
			// serveSend sends to c.txerr _and_ closes c.txdown .
			// We still want to return actual transmission error to caller.
			select {
			case err = <-c.txerr:
				return err
			default:
				return c.errSendShutdown()
			}

		case err = <-c.txerr:
			return err
		}
	}
}

// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
	defer nl.serveWg.Done()
	for {
		select {
		case <-nl.down:
			return

		case txreq := <-nl.txq:
			// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
			err := nl.sendPkt(txreq.pkt)
			//fmt.Printf("sendPkt -> %v\n", err)

			// FIXME if several goroutines call conn.Send
			// simultaneously - c.txerr even if buffered(1) will be
			// overflown and thus deadlock here.
			//
			// -> require "Conn.Send must not be used concurrently"?
			txreq.errch <- err

			// on IO error framing over peerLink becomes broken
			// so we shut down node link and all connections over it.
			//
			// XXX dup wrt sendPktDirect
			// XXX move to link.sendPkt?
			if err != nil {
				nl.shutdown()
				return
			}
		}
	}
}

// ---- transmit direct mode ----

// serveSend is not strictly needed - net.Conn.Write already can be used by multiple
// goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD)
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14
//
// thus the only reason we use serveSend is so that Conn.Close can "interrupt" Conn.Send via .txdown .
// however this adds overhead and is not needed in light mode.

// sendPktDirect sends raw packet with appropriate connection ID directly via link.
func (c *Conn) sendPktDirect(pkt *pktBuf) error {
	// set pkt connId associated with this connection
	pkt.Header().ConnId = packed.Hton32(c.connId)

	// NOTE if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
	err := c.link.sendPkt(pkt)
	//fmt.Printf("sendPkt -> %v\n", err)

	// on IO error framing over peerLink becomes broken
	// so we shut down node link and all connections over it.
	//
	// XXX dup wrt serveSend
	// XXX move to link.sendPkt?
	if err != nil {
		c.link.shutdown()
	}

	return err
}


// ---- raw IO ----

const dumpio = false

// sendPkt sends raw packet to peer.
//
// tx error, if any, is returned as is and is analyzed in serveSend.
//
// XXX pkt should be freed always or only on error?
func (nl *NodeLink) sendPkt(pkt *pktBuf) error {
	if dumpio {
		// XXX -> log
		fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
		//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
	}

	// NOTE Write writes data in full, or it is error
	_, err := nl.peerLink.Write(pkt.data)
	pkt.Free()
	return err
}

var ErrPktTooBig = errors.New("packet too big")

// recvPkt receives raw packet from peer.
//
// rx error, if any, is returned as is and is analyzed in serveRecv
//
// XXX dup in ZEO.
func (nl *NodeLink) recvPkt() (*pktBuf, error) {
	// FIXME if rxbuf is non-empty - first look there for header and then if
	// we know size -> allocate pkt with that size.
	pkt := pktAlloc(4096)
	// len=4K but cap can be more since pkt is from pool - use all space to buffer reads
	// XXX vvv -> pktAlloc() ?
	data := pkt.data[:cap(pkt.data)]

	n := 0 // number of pkt bytes obtained so far

	// next packet could be already prefetched in part by previous read
	if nl.rxbuf.Len() > 0 {
		δn, _ := nl.rxbuf.Read(data[:proto.PktHeaderLen])
		n += δn
	}

	// first read to read pkt header and hopefully rest of packet in 1 syscall
	if n < proto.PktHeaderLen {
		δn, err := io.ReadAtLeast(nl.peerLink, data[n:], proto.PktHeaderLen - n)
		if err != nil {
			return nil, err
		}
		n += δn
	}

	pkth := pkt.Header()

	msgLen := packed.Ntoh32(pkth.MsgLen)
	if msgLen > proto.PktMaxSize - proto.PktHeaderLen {
		return nil, ErrPktTooBig
	}
	pktLen := int(proto.PktHeaderLen + msgLen) // whole packet length

	// resize data if we don't have enough room in it
	data = xbytes.Resize(data, pktLen)
	data = data[:cap(data)]

	// we might have more data already prefetched in rxbuf
	if nl.rxbuf.Len() > 0 {
		δn, _ := nl.rxbuf.Read(data[n:pktLen])
		n += δn
	}

	// read rest of pkt data, if we need to
	if n < pktLen {
		δn, err := io.ReadAtLeast(nl.peerLink, data[n:], pktLen - n)
		if err != nil {
			return nil, err
		}
		n += δn
	}

	// put overread data into rxbuf for next reader
	if n > pktLen {
		nl.rxbuf.Write(data[pktLen:n])
	}

	// fixup data/pkt
	data = data[:n]
	pkt.data = data

	if dumpio {
		// XXX -> log
		fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
	}

	return pkt, nil
}


// ---- for convenience: Conn -> NodeLink & local/remote link addresses  ----

// LocalAddr returns local address of the underlying link to peer.
func (link *NodeLink) LocalAddr() net.Addr {
	return link.peerLink.LocalAddr()
}

// RemoteAddr returns remote address of the underlying link to peer.
func (link *NodeLink) RemoteAddr() net.Addr {
	return link.peerLink.RemoteAddr()
}

// Link returns underlying NodeLink of this connection.
func (c *Conn) Link() *NodeLink {
	return c.link
}

// ConnID returns connection identifier used for the connection.
func (c *Conn) ConnID() uint32 {
	return c.connId
}


// ---- for convenience: String / Error / Cause ----

func (link *NodeLink) String() string {
	s := fmt.Sprintf("%s - %s", link.LocalAddr(), link.RemoteAddr())
	return s	// XXX add "(closed)" if link is closed ?
			// XXX other flags e.g. (down) ?
}

func (c *Conn) String() string {
	s := fmt.Sprintf("%s .%d", c.link, c.connId)
	return s	// XXX add "(closed)" if c is closed ?
}

func (e *LinkError) Error() string {
	return fmt.Sprintf("%s: %s: %s", e.Link, e.Op, e.Err)
}

func (e *ConnError) Error() string {
	return fmt.Sprintf("%s .%d: %s: %s", e.Link, e.ConnId, e.Op, e.Err)
}

func (e *LinkError) Cause() error { return e.Err }
func (e *ConnError) Cause() error { return e.Err }

func (nl *NodeLink) err(op string, e error) error {
	if e == nil {
		return nil
	}
	return &LinkError{Link: nl, Op: op, Err: e}
}

func (c *Conn) err(op string, e error) error {
	if e == nil {
		return nil
	}
	return &ConnError{Link: c.link, ConnId: c.connId, Op: op, Err: e}
}


// ---- exchange of messages ----

//trace:event traceMsgRecv(c *Conn, msg proto.Msg)
//trace:event traceMsgSendPre(l *NodeLink, connId uint32, msg proto.Msg)
// XXX do we also need traceConnSend?

// msgPack allocates pktBuf and encodes msg into it.
func msgPack(connId uint32, msg proto.Msg) *pktBuf {
	l := msg.NEOMsgEncodedLen()
	buf := pktAlloc(proto.PktHeaderLen + l)

	h := buf.Header()
	h.ConnId = packed.Hton32(connId)
	h.MsgCode = packed.Hton16(msg.NEOMsgCode())
	h.MsgLen = packed.Hton32(uint32(l)) // XXX casting: think again

	msg.NEOMsgEncode(buf.Payload())
	return buf
}

// TODO msgUnpack

// Recv receives message from the connection.
func (c *Conn) Recv() (proto.Msg, error) {
	pkt, err := c.recvPkt()
	if err != nil {
		return nil, err
	}
	//defer pkt.Free()
	msg, err := c._Recv(pkt)
	pkt.Free()
	return msg, err
}

func (c *Conn) _Recv(pkt *pktBuf) (proto.Msg, error) {
	// decode packet
	pkth := pkt.Header()
	msgCode := packed.Ntoh16(pkth.MsgCode)
	msgType := proto.MsgType(msgCode)
	if msgType == nil {
		err := fmt.Errorf("invalid msgCode (%d)", msgCode)
		// XXX "decode" -> "recv: decode"?
		return nil, c.err("decode", err)
	}

	// TODO use free-list for decoded messages + when possible decode in-place
	msg := reflect.New(msgType).Interface().(proto.Msg)
//	msg := reflect.NewAt(msgType, bufAlloc(msgType.Size())


	_, err := msg.NEOMsgDecode(pkt.Payload())
	if err != nil {
		return nil, c.err("decode", err) // XXX "decode:" is already in ErrDecodeOverflow
	}

	traceMsgRecv(c, msg)
	return msg, nil
}

// sendMsg sends message with specified connection ID.
//
// it encodes message into packet, sets header appropriately and sends it.
//
// it is ok to call sendMsg in parallel with serveSend. XXX link to sendPktDirect for rationale?
func (link *NodeLink) sendMsg(connId uint32, msg proto.Msg) error {
	traceMsgSendPre(link, connId, msg)

	buf := msgPack(connId, msg)
	return link.sendPkt(buf) // XXX more context in err? (msg type)
	// FIXME ^^^ shutdown whole link on error
}

// Send sends message over the connection.
func (c *Conn) Send(msg proto.Msg) error {
	traceMsgSendPre(c.link, c.connId, msg)

	buf := msgPack(c.connId, msg)
	return c.sendPkt(buf)		// XXX more context in err? (msg type)
}

func (c *Conn) sendMsgDirect(msg proto.Msg) error {
	return c.link.sendMsg(c.connId, msg)
}


// Expect receives message and checks it is one of expected types.
//
// If verification is successful the message is decoded inplace and returned
// which indicates index of received message.
//
// On error (-1, err) is returned.
func (c *Conn) Expect(msgv ...proto.Msg) (which int, err error) {
	// XXX a bit dup wrt Recv
	pkt, err := c.recvPkt()
	if err != nil {
		return -1, err
	}
	//defer pkt.Free()
	which, err = c._Expect(pkt, msgv...)
	pkt.Free()
	return which, err
}

func (c *Conn) _Expect(pkt *pktBuf, msgv ...proto.Msg) (int, error) {
	pkth := pkt.Header()
	msgCode := packed.Ntoh16(pkth.MsgCode)

	for i, msg := range msgv {
		if msg.NEOMsgCode() == msgCode {
			_, err := msg.NEOMsgDecode(pkt.Payload())
			if err != nil {
				return -1, c.err("decode", err)
			}
			return i, nil
		}
	}

	// unexpected message
	msgType := proto.MsgType(msgCode)
	if msgType == nil {
		return -1, c.err("decode", fmt.Errorf("invalid msgCode (%d)", msgCode))
	}

	// XXX also add which messages were expected ?
	return -1, c.err("recv", fmt.Errorf("unexpected message: %v", msgType))
}

// Ask sends request and receives a response.
//
// It expects response to be either of resp type or proto.Error:
//
// If resp-type message is received, it is decoded inplace and nil is returned.
// If proto.Error message is received, it is returned as error.
//
// Otherwise returned error describes the problem.
//
// XXX return proto.Error explicitly?
func (c *Conn) Ask(req proto.Msg, resp proto.Msg) error {
	err := c.Send(req)
	if err != nil {
		return err
	}

	nerr := &proto.Error{}
	which, err := c.Expect(resp, nerr)
	switch which {
	case 0:
		return nil
	case 1:
		return nerr
	}

	return err
}

// ---- exchange of 1-1 request-reply ----
// (impedance matcher for current neo/py implementation)

// lightClose closes light connection.
//
// No Send or Recv must be in flight.
// The caller must not use c after call to lightClose - the connection is returned to freelist.
//
// Must be called only once.
func (c *Conn) lightClose() {
	nl := c.link
	releaseok := false
	nl.connMu.Lock()
	if nl.connTab != nil {
		// XXX find way to keep initiated by us conn as closed for some time (see Conn.Close)
		// but timer costs too much...
		delete(nl.connTab, c.connId)
		releaseok = true
	}
	nl.connMu.Unlock()

	// we can release c only if we removed it from connTab.
	//
	// if not - the scenario could be: nl.shutdown sets nl.connTab=nil and
	// then iterates connTab snapshot taken under nl.connMu lock. If so
	// this activity (which calls e.g. Conn.shutdown) could be running with
	// us in parallel.
	if releaseok {
		c.release()
	}
}

// Request is a message received from the link + (internally) connection handle to make a reply.
//
// Request represents 1 request - 0|1 reply interaction model.
//
// See "Lightweight mode" in top-level package doc for overview.
type Request struct {
	Msg  proto.Msg
	conn *Conn
}

// Recv1 accepts a connection with the first message peer sent us when establishing it.
//
// See "Lightweight mode" in top-level package doc for overview.
func (link *NodeLink) Recv1() (Request, error) {
	conn, err := link.Accept()
	if err != nil {
		return Request{}, err // XXX or return *Request? (want to avoid alloc)
	}

	// NOTE serveRecv guaranty that when a conn is accepted, there is 1 message in conn.rxq
	msg, err := conn.Recv() // XXX better directly from <-rxq ?
	if err != nil {
		conn.Close() // XXX -> conn.lightClose()
		return Request{}, err
	}

	// noone will be reading from conn anymore - mark rx down so that if
	// peer sends any another packet with same .connID, serveRecv does not
	// deadlock trying to put it to conn.rxq.
	conn.downRX(errConnClosed)

	return Request{Msg: msg, conn: conn}, nil
}

// Reply sends response to request.
//
// See "Lightweight mode" in top-level package doc for overview.
func (req *Request) Reply(resp proto.Msg) error {
	return req.conn.sendMsgDirect(resp)
	//err1 := req.conn.Send(resp)
	//err2 := req.conn.Close()	// XXX no - only Send here?
	//return xerr.First(err1, err2)
}

// Close must be called to free request resources.
//
// Close must be called exactly once.
// The request object cannot be used any more after call to Close.
//
// See "Lightweight mode" in top-level package doc for overview.
func (req *Request) Close() { // XXX +error?
	// XXX req.Msg.Release() ?
	req.Msg = nil
	req.conn.lightClose()
	req.conn = nil // just in case
}


// Send1 sends one message over new connection.
//
// It creates new connection itself internally, and shuts down it after
// transmission completes.
//
// See "Lightweight mode" in top-level package doc for overview.
func (link *NodeLink) Send1(msg proto.Msg) error {
	conn, err := link.NewConn()
	if err != nil {
		return err
	}

	conn.downRX(errConnClosed) // XXX just initially create conn this way

	err = conn.sendMsgDirect(msg)
	conn.lightClose()
	return err
}


// Ask1 sends request and receives response in 1-1 model.
//
// See Conn.Ask for semantic details.
//
// See "Lightweight mode" in top-level package doc for overview.
func (link *NodeLink) Ask1(req proto.Msg, resp proto.Msg) (err error) {
	conn, err := link.NewConn()
	if err != nil {
		return err
	}

	//defer conn.lightClose()
	err = conn._Ask1(req, resp)
	conn.lightClose()
	return err
}

func (conn *Conn) _Ask1(req proto.Msg, resp proto.Msg) error {
	err := conn.sendMsgDirect(req)
	if err != nil {
		return err
	}

	nerr := &proto.Error{}
	which, err := conn.Expect(resp, nerr)
	switch which {
	case 0:
		return nil
	case 1:
		return nerr
	}

	return err
}

func (req *Request) Link() *NodeLink {
	return req.conn.Link()
}