Commit 9c1443ba authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4732ff17
......@@ -51,7 +51,6 @@ type NodeLink struct {
peerLink net.Conn // raw conn to peer
connMu sync.Mutex // TODO -> RW ?
//connMu debug.Mutex // TODO -> RW ?
connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us
......@@ -90,6 +89,12 @@ type Conn struct {
closed chan struct{}
}
// ErrLinkClosed is the error indicated for operations on closed NodeLink
var ErrLinkClosed = errors.New("node link is closed") // XXX -> read/write but also Accept ?
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrClosedConn = errors.New("read/write on closed connection")
// LinkRole is a role an end of NodeLink is intended to play
type LinkRole int
const (
......@@ -144,6 +149,32 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
return nl
}
// newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl,
connId: connId,
rxq: make(chan *PktBuf), // TODO buffering
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
closed: make(chan struct{}),
}
nl.connTab[connId] = c
return c
}
// NewConn creates new connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn {
nl.connMu.Lock()
defer nl.connMu.Unlock()
if nl.connTab == nil {
// XXX -> error (because NodeLink can become "closed" due to IO errors ?
panic("NewConn() on closed node-link")
}
c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2
return c
}
// close is worker for Close & friends.
// It marks all active Conns and NodeLink itself as closed.
func (nl *NodeLink) close() {
......@@ -177,105 +208,27 @@ func (nl *NodeLink) Close() error {
return nl.errClose
}
// sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if false {
// XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
}
// NOTE Write writes data in full, or it is error
_, err := nl.peerLink.Write(pkt.Data)
return err
}
var ErrPktTooSmall = errors.New("packet too small")
var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...)
// first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)}
// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
if err != nil {
return nil, err
}
pkth := pkt.Header()
// XXX -> better PktHeader.Decode() ?
if ntoh32(pkth.Len) < PktHeadLen {
return nil, ErrPktTooSmall // length is a whole packet len with header
}
if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
return nil, ErrPktTooBig
}
// XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], ntoh32(pkth.Len))
if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
// grow rxbuf
rxbuf2 := make([]byte, ntoh32(pkth.Len))
copy(rxbuf2, pkt.Data[:n])
pkt.Data = rxbuf2
}
// cut .Data len to length of packet
pkt.Data = pkt.Data[:ntoh32(pkth.Len)]
// read rest of pkt data, if we need to
if n < len(pkt.Data) {
_, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
if err != nil {
return nil, err
}
}
if false {
// XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
}
return pkt, nil
}
// newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl,
connId: connId,
rxq: make(chan *PktBuf), // TODO buffering
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
closed: make(chan struct{}),
}
nl.connTab[connId] = c
return c
// worker for Close() & co
func (c *Conn) close() {
c.closeOnce.Do(func() {
close(c.closed)
})
}
// NewConn creates new connection on top of node-node link
func (nl *NodeLink) NewConn() *Conn {
nl.connMu.Lock()
defer nl.connMu.Unlock()
if nl.connTab == nil {
// XXX -> error (because NodeLink can become "closed" due to IO errors ?
panic("NewConn() on closed node-link")
}
c := nl.newConn(nl.nextConnId)
nl.nextConnId += 2
return c
// Close closes connection
// Any blocked Send() or Recv() will be unblocked and return error
//
// NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break framing.
func (c *Conn) Close() error {
// adjust nodeLink.connTab
c.nodeLink.connMu.Lock()
delete(c.nodeLink.connTab, c.connId)
c.nodeLink.connMu.Unlock()
c.close()
return nil
}
// ErrLinkClosed is the error indicated for operations on closed NodeLink
var ErrLinkClosed = errors.New("node link is closed")
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
// Accept waits for and accepts incoming connection on top of node-node link
func (nl *NodeLink) Accept() (*Conn, error) {
// this node link is not accepting connections
......@@ -292,9 +245,22 @@ func (nl *NodeLink) Accept() (*Conn, error) {
}
}
// Recv receives packet from connection
func (c *Conn) Recv() (*PktBuf, error) {
select {
case <-c.closed:
// XXX get err from c.nodeLink.recvErr
// XXX if nil -> ErrClosedConn ?
return nil, ErrClosedConn // XXX -> EOF ?
case pkt := <-c.rxq: // XXX try to leave only pkt, ok := <-c.rxq
return pkt, nil // XXX error = ?
}
}
// serveRecv handles incoming packets routing them to either appropriate
// already-established connection or to new handling goroutine. XXX Accept
// already-established connection or, if node link is accepting incoming
// connections, to new connection put to accept queue.
func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done()
for {
......@@ -363,6 +329,51 @@ type txReq struct {
errch chan error
}
// Send sends packet via connection
func (c *Conn) Send(pkt *PktBuf) error {
// set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId)
var err error
select {
case <-c.closed:
return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
case c.nodeLink.txq <- txReq{pkt, c.txerr}:
select {
// tx request was sent to serveSend and is being transmitted on the wire.
// the transmission may block for indefinitely long though and
// we cannot interrupt it as the only way to interrupt is
// .nodeLink.Close() which will close all other Conns.
//
// That's why we are also checking for c.closed while waiting
// for reply from serveSend (and leave pkt to finish transmitting).
//
// NOTE after we return straight here serveSend won't be later
// blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.closed:
// also poll c.txerr here because: when there is TX error,
// serveSend sends to c.txerr _and_ closes c.closed .
// We still want to return actual transmission error to caller.
select {
case err = <-c.txerr:
return err
default:
return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
}
case err = <-c.txerr:
//fmt.Printf("%v <- c.txerr\n", err)
return err
}
}
// return err
}
// serveSend handles requests to transmit packets from client connections and
// serially executes them over associated node link.
func (nl *NodeLink) serveSend() {
......@@ -416,9 +427,7 @@ func (nl *NodeLink) serveSend() {
}
// ErrClosedConn is the error indicated for read/write operations on closed Conn
var ErrClosedConn = errors.New("read/write on closed connection")
// XXX used ?
func errClosedConn(err error) error {
if err != nil {
return err
......@@ -426,87 +435,79 @@ func errClosedConn(err error) error {
return ErrClosedConn
}
// Send packet via connection
func (c *Conn) Send(pkt *PktBuf) error {
// set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId)
var err error
// ---- raw IO ----
select {
case <-c.closed:
return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
// sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend
func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if false {
// XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
//defer fmt.Printf("\t-> sendPkt err: %v\n", err)
}
case c.nodeLink.txq <- txReq{pkt, c.txerr}:
select {
// tx request was sent to serveSend and is being transmitted on the wire.
// the transmission may block for indefinitely long though and
// we cannot interrupt it as the only way to interrupt is
// .nodeLink.Close() which will close all other Conns.
//
// That's why we are also checking for c.closed while waiting
// for reply from serveSend (and leave pkt to finish transmitting).
//
// NOTE after we return straight here serveSend won't be later
// blocked on c.txerr<- because that backchannel is a non-blocking one.
case <-c.closed:
// NOTE Write writes data in full, or it is error
_, err := nl.peerLink.Write(pkt.Data)
return err
}
// also poll c.txerr here because: when there is TX error,
// serveSend sends to c.txerr _and_ closes c.closed .
// We still want to return actual transmission error to caller.
select {
case err = <-c.txerr:
return err
default:
return ErrClosedConn
// return errClosedConn(c.nodeLink.sendErr) // XXX locking ?
}
var ErrPktTooSmall = errors.New("packet too small")
var ErrPktTooBig = errors.New("packet too big")
case err = <-c.txerr:
//fmt.Printf("%v <- c.txerr\n", err)
return err
}
// recvPkt receives raw packet from peer
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...)
// first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)}
// TODO reenable, but NOTE next packet can be also prefetched here -> use buffering ?
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
if err != nil {
return nil, err
}
// return err
}
pkth := pkt.Header()
// Receive packet from connection
func (c *Conn) Recv() (*PktBuf, error) {
select {
case <-c.closed:
// XXX get err from c.nodeLink.recvErr
// XXX if nil -> ErrClosedConn ?
return nil, ErrClosedConn // XXX -> EOF ?
// XXX -> better PktHeader.Decode() ?
if ntoh32(pkth.Len) < PktHeadLen {
return nil, ErrPktTooSmall // length is a whole packet len with header
}
if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
return nil, ErrPktTooBig
}
case pkt := <-c.rxq: // XXX try to leave only pkt, ok := <-c.rxq
return pkt, nil // XXX error = ?
// XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], ntoh32(pkth.Len))
if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
// grow rxbuf
rxbuf2 := make([]byte, ntoh32(pkth.Len))
copy(rxbuf2, pkt.Data[:n])
pkt.Data = rxbuf2
}
}
// cut .Data len to length of packet
pkt.Data = pkt.Data[:ntoh32(pkth.Len)]
// worker for Close() & co
func (c *Conn) close() {
c.closeOnce.Do(func() {
close(c.closed)
})
}
// read rest of pkt data, if we need to
if n < len(pkt.Data) {
_, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
if err != nil {
return nil, err
}
}
// Close closes connection
// Any blocked Send() or Recv() will be unblocked and return error
//
// NOTE for Send() - once transmission was started - it will complete in the
// background on the wire not to break framing.
func (c *Conn) Close() error {
// adjust nodeLink.connTab
c.nodeLink.connMu.Lock()
delete(c.nodeLink.connTab, c.connId)
c.nodeLink.connMu.Unlock()
c.close()
return nil
if false {
// XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
}
return pkt, nil
}
// for convenience: Dial/Listen
// ---- for convenience: Dial/Listen ----
// Dial connects to address on named network and wrap the connection as NodeLink
// TODO +tls.Config
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment