Commit 49f0c8b9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0fdaf75e
...@@ -62,8 +62,9 @@ type NodeLink struct { ...@@ -62,8 +62,9 @@ type NodeLink struct {
txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
errMu sync.Mutex errMu sync.Mutex
// sendErr error // error got from sendPkt, if any // errSend error // error got from sendPkt, if any
recvErr error // error got from recvPkt, if any errRecv error // error got from recvPkt, if any
errClose error // error got from peerLink.Close
// once because: NodeLink has to be explicitly closed by user; it can also // once because: NodeLink has to be explicitly closed by user; it can also
// be "closed" by IO errors on peerLink // be "closed" by IO errors on peerLink
...@@ -157,6 +158,12 @@ func (nl *NodeLink) close() { ...@@ -157,6 +158,12 @@ func (nl *NodeLink) close() {
nl.connMu.Unlock() nl.connMu.Unlock()
close(nl.closed) close(nl.closed)
// close actual link to peer. this will wakeup serve{Send,Recv}
// NOTE we need it here so that e.g. aborting on error serveSend wakes up serveRecv
nl.errMu.Lock()
nl.errClose = nl.peerLink.Close()
nl.errMu.Unlock()
}) })
} }
...@@ -165,14 +172,9 @@ func (nl *NodeLink) close() { ...@@ -165,14 +172,9 @@ func (nl *NodeLink) close() {
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
nl.close() nl.close()
// close actual link to peer
// this will wakeup serve{Send,Recv}
err := nl.peerLink.Close()
// wait for serve{Send,Recv} to complete // wait for serve{Send,Recv} to complete
nl.serveWg.Wait() nl.serveWg.Wait()
return nl.errClose
return err
} }
// sendPkt sends raw packet to peer // sendPkt sends raw packet to peer
...@@ -217,7 +219,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -217,7 +219,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
return nil, ErrPktTooBig return nil, ErrPktTooBig
} }
//pkt.Data = xbytes.Resize32(pkt.Data[:n], ntoh32(pkth.Len)) // XXX -> pkt.Data = xbytes.Resize32(pkt.Data[:n], ntoh32(pkth.Len))
if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) { if ntoh32(pkth.Len) > uint32(cap(pkt.Data)) {
// grow rxbuf // grow rxbuf
rxbuf2 := make([]byte, ntoh32(pkth.Len)) rxbuf2 := make([]byte, ntoh32(pkth.Len))
...@@ -244,7 +246,8 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -244,7 +246,8 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
} }
// worker for NewConn() & friends. Must be called with connMu held. // newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn { func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl, c := &Conn{nodeLink: nl,
connId: connId, connId: connId,
...@@ -270,7 +273,7 @@ func (nl *NodeLink) NewConn() *Conn { ...@@ -270,7 +273,7 @@ func (nl *NodeLink) NewConn() *Conn {
} }
// ErrLinkClosed is the error indicated for operations on closed NodeLink // ErrLinkClosed is the error indicated for operations on closed NodeLink
var ErrLinkClosed = errors.New("node link closed") var ErrLinkClosed = errors.New("node link is closed")
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
// Accept waits for and accepts incoming connection on top of node-node link // Accept waits for and accepts incoming connection on top of node-node link
...@@ -300,7 +303,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -300,7 +303,7 @@ func (nl *NodeLink) serveRecv() {
fmt.Printf("recvPkt -> %v, %v\n", pkt, err) fmt.Printf("recvPkt -> %v, %v\n", pkt, err)
if err != nil { if err != nil {
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
// so we are marking node link and all connections as closed // so we mark node link and all connections as closed and stop service
select { select {
case <-nl.closed: case <-nl.closed:
...@@ -309,11 +312,14 @@ func (nl *NodeLink) serveRecv() { ...@@ -309,11 +312,14 @@ func (nl *NodeLink) serveRecv() {
default: default:
} }
// TODO protect with errMu nl.errMu.Lock()
nl.recvErr = err nl.errRecv = err
nl.errMu.Unlock()
// wake-up all conns & mark node link as closed // wake-up all conns & mark node link as closed
// NOTE this also wakeups serveSend/sendPkt
nl.close() nl.close()
return return
} }
...@@ -373,10 +379,11 @@ func (nl *NodeLink) serveSend() { ...@@ -373,10 +379,11 @@ func (nl *NodeLink) serveSend() {
if err != nil { if err != nil {
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
// so we are marking node link and all connections as closed // so mark node link and all connections as closed and stop service
select { select {
case <-nl.closed: case <-nl.closed:
// error due to closing NodeLink // error was due to closing NodeLink
err = ErrLinkClosed err = ErrLinkClosed
default: default:
} }
...@@ -389,6 +396,7 @@ func (nl *NodeLink) serveSend() { ...@@ -389,6 +396,7 @@ func (nl *NodeLink) serveSend() {
// nl.sendErr = err // nl.sendErr = err
// wake-up all conns & mark node link as closed // wake-up all conns & mark node link as closed
// NOTE this also wakeups serveRecv/recvPkt
nl.close() nl.close()
return return
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment