Commit 55756877 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 98ff316b
...@@ -62,16 +62,14 @@ type NodeLink struct { ...@@ -62,16 +62,14 @@ type NodeLink struct {
// (rx packets are routed to Conn.rxq) // (rx packets are routed to Conn.rxq)
down chan struct{} // ready when NodeLink is marked as no longer operational down chan struct{} // ready when NodeLink is marked as no longer operational
downOnce sync.Once // shutdown may be due both Close and IO error downOnce sync.Once // shutdown may be due to both Close and IO error
downWg sync.WaitGroup // for activities at shutdown downWg sync.WaitGroup // for activities at shutdown
errClose error // error got from peerLink.Close on shutdown errClose error // error got from peerLink.Close
errMu sync.Mutex errMu sync.Mutex
errRecv error // error got from recvPkt on shutdown errRecv error // error got from recvPkt on shutdown
closeCalled uint32 // whether Close was called closed uint32 // whether Close was called
} }
// Conn is a connection established over NodeLink // Conn is a connection established over NodeLink
...@@ -84,19 +82,17 @@ type Conn struct { ...@@ -84,19 +82,17 @@ type Conn struct {
nodeLink *NodeLink nodeLink *NodeLink
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit errors for this Conn go back here txerr chan error // transmit results for this Conn go back here
down chan struct{} // ready when Conn is marked as no longer operational down chan struct{} // ready when Conn is marked as no longer operational
downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown downOnce sync.Once // shutdown may be called by both Close and nodelink.shutdown
closeCalled uint32 // whether Close was called; ^^^ can be from IO error on node link rxerrOnce sync.Once // IO error is reported only once - then it is link down or closed
rxerrOnce sync.Once // XXX whether actual RX error was already reported to caller closed uint32 // whether Close was called
} }
// ErrLinkClosed is the error indicated for operations on closed NodeLink var ErrLinkClosed = errors.New("node link is closed") // operations on closed NodeLink
var ErrLinkClosed = errors.New("node link is closed") // XXX -> read/write but also Accept ? var ErrLinkDown = errors.New("node link is down") // e.g. due to IO error
var ErrLinkDown = errors.New("node link is down") // XXX due to IO errors?
var ErrLinkNoListen = errors.New("node link is not listening for incoming connections") var ErrLinkNoListen = errors.New("node link is not listening for incoming connections")
var ErrClosedConn = errors.New("read/write on closed connection") var ErrClosedConn = errors.New("read/write on closed connection")
...@@ -173,7 +169,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -173,7 +169,7 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
nl.connMu.Lock() nl.connMu.Lock()
defer nl.connMu.Unlock() defer nl.connMu.Unlock()
if nl.connTab == nil { if nl.connTab == nil {
if atomic.LoadUint32(&nl.closeCalled) != 0 { if atomic.LoadUint32(&nl.closed) != 0 {
return nil, ErrLinkClosed return nil, ErrLinkClosed
} }
return nil, ErrLinkDown return nil, ErrLinkDown
...@@ -218,7 +214,7 @@ func (nl *NodeLink) shutdown() { ...@@ -218,7 +214,7 @@ func (nl *NodeLink) shutdown() {
// Close closes node-node link. // Close closes node-node link.
// IO on connections established over it is automatically interrupted with an error. // IO on connections established over it is automatically interrupted with an error.
func (nl *NodeLink) Close() error { func (nl *NodeLink) Close() error {
atomic.StoreUint32(&nl.closeCalled, 1) atomic.StoreUint32(&nl.closed, 1)
nl.shutdown() nl.shutdown()
nl.downWg.Wait() nl.downWg.Wait()
return nl.errClose return nl.errClose
...@@ -242,7 +238,7 @@ func (c *Conn) Close() error { ...@@ -242,7 +238,7 @@ func (c *Conn) Close() error {
delete(c.nodeLink.connTab, c.connId) delete(c.nodeLink.connTab, c.connId)
c.nodeLink.connMu.Unlock() c.nodeLink.connMu.Unlock()
atomic.StoreUint32(&c.closeCalled, 1) atomic.StoreUint32(&c.closed, 1)
c.shutdown() c.shutdown()
return nil return nil
} }
...@@ -256,7 +252,7 @@ func (nl *NodeLink) Accept() (*Conn, error) { ...@@ -256,7 +252,7 @@ func (nl *NodeLink) Accept() (*Conn, error) {
select { select {
case <-nl.down: case <-nl.down:
if atomic.LoadUint32(&nl.closeCalled) != 0 { if atomic.LoadUint32(&nl.closed) != 0 {
return nil, ErrLinkClosed // XXX + op = Accept ? return nil, ErrLinkClosed // XXX + op = Accept ?
} }
return nil, ErrLinkDown // XXX test return nil, ErrLinkDown // XXX test
...@@ -269,10 +265,10 @@ func (nl *NodeLink) Accept() (*Conn, error) { ...@@ -269,10 +265,10 @@ func (nl *NodeLink) Accept() (*Conn, error) {
// errRecvShutdown returns appropriate error when c.down is found ready in Recv // errRecvShutdown returns appropriate error when c.down is found ready in Recv
func (c *Conn) errRecvShutdown() error { func (c *Conn) errRecvShutdown() error {
switch { switch {
case atomic.LoadUint32(&c.closeCalled) != 0: case atomic.LoadUint32(&c.closed) != 0:
return ErrClosedConn return ErrClosedConn
case atomic.LoadUint32(&c.nodeLink.closeCalled) != 0: case atomic.LoadUint32(&c.nodeLink.closed) != 0:
return ErrLinkClosed return ErrLinkClosed
default: default:
...@@ -367,14 +363,14 @@ type txReq struct { ...@@ -367,14 +363,14 @@ type txReq struct {
// errSendShutdown returns approproate error when c.down is found ready in Send // errSendShutdown returns approproate error when c.down is found ready in Send
func (c *Conn) errSendShutdown() error { func (c *Conn) errSendShutdown() error {
switch { switch {
case atomic.LoadUint32(&c.closeCalled) != 0: case atomic.LoadUint32(&c.closed) != 0:
return ErrClosedConn return ErrClosedConn
// the only other error possible besides Conn being .Close()'ed is that // the only other error possible besides Conn being .Close()'ed is that
// NodeLink was closed/shutdowned itself - on actual IO problems corresponding // NodeLink was closed/shutdowned itself - on actual IO problems corresponding
// error is delivered to particular Send that caused it. // error is delivered to particular Send that caused it.
case atomic.LoadUint32(&c.nodeLink.closeCalled) != 0: case atomic.LoadUint32(&c.nodeLink.closed) != 0:
return ErrLinkClosed return ErrLinkClosed
default: default:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment