Commit 0fdaf75e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 78f45c78
...@@ -26,7 +26,7 @@ import ( ...@@ -26,7 +26,7 @@ import (
"sync" "sync"
"fmt" "fmt"
"lab.nexedi.com/kirr/go123/xruntime/debug" //"lab.nexedi.com/kirr/go123/xruntime/debug"
) )
// NodeLink is a node-node link in NEO // NodeLink is a node-node link in NEO
...@@ -50,8 +50,8 @@ import ( ...@@ -50,8 +50,8 @@ import (
type NodeLink struct { type NodeLink struct {
peerLink net.Conn // raw conn to peer peerLink net.Conn // raw conn to peer
//connMu sync.Mutex // TODO -> RW ? connMu sync.Mutex // TODO -> RW ?
connMu debug.Mutex // TODO -> RW ? //connMu debug.Mutex // TODO -> RW ?
connTab map[uint32]*Conn // connId -> Conn associated with connId connTab map[uint32]*Conn // connId -> Conn associated with connId
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
...@@ -61,7 +61,7 @@ type NodeLink struct { ...@@ -61,7 +61,7 @@ type NodeLink struct {
txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
// errMu sync.Mutex -> use connMu errMu sync.Mutex
// sendErr error // error got from sendPkt, if any // sendErr error // error got from sendPkt, if any
recvErr error // error got from recvPkt, if any recvErr error // error got from recvPkt, if any
...@@ -143,14 +143,18 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -143,14 +143,18 @@ func NewNodeLink(conn net.Conn, role LinkRole) *NodeLink {
return nl return nl
} }
// worker for Close & friends. Must be called with connMu held. // close is worker for Close & friends.
// marks all active Conns and NodeLink itself as closed // It marks all active Conns and NodeLink itself as closed.
func (nl *NodeLink) close() { func (nl *NodeLink) close() {
nl.closeOnce.Do(func() { nl.closeOnce.Do(func() {
nl.connMu.Lock()
for _, conn := range nl.connTab { for _, conn := range nl.connTab {
conn.close() // XXX explicitly pass error here ? // NOTE anything waking up on Conn.closed must not lock
// connMu - else it will deadlock.
conn.close()
} }
nl.connTab = nil // clear + mark closed nl.connTab = nil // clear + mark closed
nl.connMu.Unlock()
close(nl.closed) close(nl.closed)
}) })
...@@ -159,9 +163,6 @@ func (nl *NodeLink) close() { ...@@ -159,9 +163,6 @@ func (nl *NodeLink) close() {
// Close closes node-node link. // Close closes node-node link.
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
nl.connMu.Lock()
defer nl.connMu.Unlock()
nl.close() nl.close()
// close actual link to peer // close actual link to peer
...@@ -175,33 +176,27 @@ func (nl *NodeLink) Close() error { ...@@ -175,33 +176,27 @@ func (nl *NodeLink) Close() error {
} }
// sendPkt sends raw packet to peer // sendPkt sends raw packet to peer
// tx error, if any, is returned as is and is analyzed in serveSend
func (nl *NodeLink) sendPkt(pkt *PktBuf) error { func (nl *NodeLink) sendPkt(pkt *PktBuf) error {
if true { if true {
// XXX -> log // XXX -> log
fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v > %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
}
// XXX if nl is closed peerLink will return "io on closed xxx" but
// maybe better to check explicitly and return ErrClosedLink
_, err := nl.peerLink.Write(pkt.Data) // FIXME write Data in full
//defer fmt.Printf("\t-> sendPkt err: %v\n", err) //defer fmt.Printf("\t-> sendPkt err: %v\n", err)
if err != nil {
// XXX do we need to retry if err is temporary?
// TODO data could be written partially and thus the message stream is now broken
// -> close connection / whole NodeLink ?
} }
// NOTE Write writes data in full, or it is error
_, err := nl.peerLink.Write(pkt.Data)
return err return err
} }
var ErrPktTooSmall = errors.New("packet too small")
var ErrPktTooBig = errors.New("packet too big")
// recvPkt receives raw packet from peer // recvPkt receives raw packet from peer
// rx error, if any, is returned as is and is analyzed in serveRecv
func (nl *NodeLink) recvPkt() (*PktBuf, error) { func (nl *NodeLink) recvPkt() (*PktBuf, error) {
// XXX if nl is closed peerLink will return "io on closed xxx" but
// maybe better to check explicitly and return ErrClosedLink
// TODO organize rx buffers management (freelist etc) // TODO organize rx buffers management (freelist etc)
// TODO cleanup lots of ntoh32(...) // TODO cleanup lots of ntoh32(...)
// XXX do we need to retry if err is temporary?
// TODO on error framing is broken -> close connection / whole NodeLink ?
// first read to read pkt header and hopefully up to page of data in 1 syscall // first read to read pkt header and hopefully up to page of data in 1 syscall
pkt := &PktBuf{make([]byte, 4096)} pkt := &PktBuf{make([]byte, 4096)}
...@@ -209,22 +204,20 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -209,22 +204,20 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
//n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen) //n, err := io.ReadAtLeast(nl.peerLink, pkt.Data, PktHeadLen)
n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen]) n, err := io.ReadFull(nl.peerLink, pkt.Data[:PktHeadLen])
if err != nil { if err != nil {
return nil, err // XXX err adjust ? -> (?) framing error return nil, err
} }
pkth := pkt.Header() pkth := pkt.Header()
// XXX -> better PktHeader.Decode() ? // XXX -> better PktHeader.Decode() ?
if ntoh32(pkth.Len) < PktHeadLen { if ntoh32(pkth.Len) < PktHeadLen {
// TODO framing error -> nl.CloseWithError(err) return nil, ErrPktTooSmall // length is a whole packet len with header
panic("TODO pkt.Len < PktHeadLen") // length is a whole packet len with header
} }
if ntoh32(pkth.Len) > MAX_PACKET_SIZE { if ntoh32(pkth.Len) > MAX_PACKET_SIZE {
// TODO framing error -> nl.CloseWithError(err) return nil, ErrPktTooBig
panic("TODO message too big")
} }
//pkt.Data = xbytes.Resize32(pkt.Data, ntoh32(pkth.Len)) //pkt.Data = xbytes.Resize32(pkt.Data[:n], ntoh32(pkth.Len))
if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) { if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
// grow rxbuf // grow rxbuf
rxbuf2 := make([]byte, ntoh32(pkth.Len)) rxbuf2 := make([]byte, ntoh32(pkth.Len))
...@@ -238,7 +231,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -238,7 +231,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
if n < len(pkt.Data) { if n < len(pkt.Data) {
_, err = io.ReadFull(nl.peerLink, pkt.Data[n:]) _, err = io.ReadFull(nl.peerLink, pkt.Data[n:])
if err != nil { if err != nil {
return nil, err // XXX err adjust ? -> (?) framing error return nil, err
} }
} }
...@@ -268,6 +261,7 @@ func (nl *NodeLink) NewConn() *Conn { ...@@ -268,6 +261,7 @@ func (nl *NodeLink) NewConn() *Conn {
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
if nl.connTab == nil { if nl.connTab == nil {
// XXX -> error (because NodeLink can become "closed" due to IO errors ?
panic("NewConn() on closed node-link") panic("NewConn() on closed node-link")
} }
c := nl.newConn(nl.nextConnId) c := nl.newConn(nl.nextConnId)
...@@ -290,14 +284,14 @@ func (nl *NodeLink) Accept() (*Conn, error) { ...@@ -290,14 +284,14 @@ func (nl *NodeLink) Accept() (*Conn, error) {
case <-nl.closed: case <-nl.closed:
return nil, ErrLinkClosed // XXX + op = Accept ? return nil, ErrLinkClosed // XXX + op = Accept ?
case c := <-nl.acceptq: case c := <-nl.acceptq: // XXX -> only c, ok := <-nl.acceptq ?
return c, nil return c, nil
} }
} }
// serveRecv handles incoming packets routing them to either appropriate // serveRecv handles incoming packets routing them to either appropriate
// already-established connection or to new handling goroutine. // already-established connection or to new handling goroutine. XXX Accept
func (nl *NodeLink) serveRecv() { func (nl *NodeLink) serveRecv() {
defer nl.serveWg.Done() defer nl.serveWg.Done()
for { for {
...@@ -310,56 +304,49 @@ func (nl *NodeLink) serveRecv() { ...@@ -310,56 +304,49 @@ func (nl *NodeLink) serveRecv() {
select { select {
case <-nl.closed: case <-nl.closed:
// error due to closing NodeLink // error was due to closing NodeLink
err = ErrLinkClosed err = ErrLinkClosed
default: default:
} }
println("\tzzz") // TODO protect with errMu
nl.connMu.Lock()
println("\tzzz 2")
defer nl.connMu.Unlock()
nl.recvErr = err nl.recvErr = err
println("\trrr")
// wake-up all conns & mark node link as closed // wake-up all conns & mark node link as closed
nl.close() nl.close()
println("\tsss")
return return
} }
// pkt.ConnId -> Conn // pkt.ConnId -> Conn
connId := ntoh32(pkt.Header().ConnId) connId := ntoh32(pkt.Header().ConnId)
accept := false
nl.connMu.Lock() nl.connMu.Lock()
conn := nl.connTab[connId] conn := nl.connTab[connId]
if conn == nil { if conn == nil {
if nl.acceptq != nil { if nl.acceptq != nil {
// we are accepting new incoming connection // we are accepting new incoming connection
conn = nl.newConn(connId) conn = nl.newConn(connId)
accept = true // XXX what if Accept exited because of just recently close(nl.closed)?
// -> check nl.closed here too ?
nl.acceptq <- conn
} }
} }
nl.connMu.Unlock()
// we have not accepted incoming connection - ignore packet // we have not accepted incoming connection - ignore packet
if conn == nil { if conn == nil {
// XXX also log? // XXX also log / increment counter?
nl.connMu.Unlock()
continue continue
} }
if accept {
// XXX what if Accept exited because of just recently close(nl.closed)?
// -> check nl.closed here too ?
nl.acceptq <- conn
}
// route packet to serving goroutine handler // route packet to serving goroutine handler
// XXX what if Conn.Recv exited because of just recently close(nl.closed) ? // XXX what if Conn.Recv exited because of just recently close(nl.closed) ?
// -> check nl.closed here too ? // -> check nl.closed here too ?
conn.rxq <- pkt conn.rxq <- pkt
// keep connMu locked until here: so that ^^^ `conn.rxq <- pkt` can be
// sure conn stays not closed e.g. by Conn.Close
nl.connMu.Unlock()
} }
} }
...@@ -395,21 +382,19 @@ func (nl *NodeLink) serveSend() { ...@@ -395,21 +382,19 @@ func (nl *NodeLink) serveSend() {
} }
} }
txreq.errch <- err txreq.errch <- err // XXX recheck wakeup logic for err case
if err != nil { if err != nil {
nl.connMu.Lock() // XXX use errMu to lock vvv if needed
defer nl.connMu.Unlock()
// nl.sendErr = err // nl.sendErr = err
// wake-up all conns & mark node link as closed // wake-up all conns & mark node link as closed
nl.close() nl.close()
}
return return
} }
} }
}
} }
...@@ -468,7 +453,7 @@ func (c *Conn) Recv() (*PktBuf, error) { ...@@ -468,7 +453,7 @@ func (c *Conn) Recv() (*PktBuf, error) {
// XXX if nil -> ErrClosedConn ? // XXX if nil -> ErrClosedConn ?
return nil, ErrClosedConn // XXX -> EOF ? return nil, ErrClosedConn // XXX -> EOF ?
case pkt := <-c.rxq: case pkt := <-c.rxq: // XXX try to leave only pkt, ok := <-c.rxq
return pkt, nil // XXX error = ? return pkt, nil // XXX error = ?
} }
} }
......
...@@ -158,7 +158,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -158,7 +158,7 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
/* ///*
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := WorkGroup() wg := WorkGroup()
...@@ -323,13 +323,13 @@ func TestNodeLink(t *testing.T) { ...@@ -323,13 +323,13 @@ func TestNodeLink(t *testing.T) {
xclose(c11) xclose(c11)
xclose(c12) xclose(c12)
xclose(nl2) xclose(nl2)
*/ //*/
// NodeLink.Close vs Conn.Send/Recv on another side TODO // NodeLink.Close vs Conn.Send/Recv on another side TODO
nl1, nl2 := _nodeLinkPipe(0, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(0, linkNoRecvSend)
c11 := nl1.NewConn() c11 = nl1.NewConn()
c12 := nl1.NewConn() c12 = nl1.NewConn()
wg := WorkGroup() wg = WorkGroup()
wg.Gox(func() { wg.Gox(func() {
println(">>> RECV START") println(">>> RECV START")
pkt, err := c11.Recv() pkt, err := c11.Recv()
...@@ -362,7 +362,7 @@ func TestNodeLink(t *testing.T) { ...@@ -362,7 +362,7 @@ func TestNodeLink(t *testing.T) {
xclose(nl1) xclose(nl1)
println("333") println("333")
/* ///*
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = WorkGroup() wg = WorkGroup()
...@@ -448,5 +448,5 @@ func TestNodeLink(t *testing.T) { ...@@ -448,5 +448,5 @@ func TestNodeLink(t *testing.T) {
xclose(c2) xclose(c2)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
*/ //*/
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment