Commit e1a3677f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 91be5cdd
...@@ -62,7 +62,6 @@ type NodeLink struct { ...@@ -62,7 +62,6 @@ type NodeLink struct {
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
acceptq chan *Conn // queue of incoming connections for Accept acceptq chan *Conn // queue of incoming connections for Accept
// = nil if NodeLink is not accepting connections <- XXX no
txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
// (rx packets are routed to Conn.rxq) // (rx packets are routed to Conn.rxq)
...@@ -88,7 +87,7 @@ type NodeLink struct { ...@@ -88,7 +87,7 @@ type NodeLink struct {
// //
// It is safe to use Conn from multiple goroutines simultaneously. // It is safe to use Conn from multiple goroutines simultaneously.
type Conn struct { type Conn struct {
nodeLink *NodeLink link *NodeLink
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here rxq chan *PktBuf // received packets for this Conn go here
txerr chan error // transmit results for this Conn go back here txerr chan error // transmit results for this Conn go back here
...@@ -187,7 +186,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -187,7 +186,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
// newConn creates new Conn with id=connId and registers it into connTab. // newConn creates new Conn with id=connId and registers it into connTab.
// Must be called with connMu held. // Must be called with connMu held.
func (nl *NodeLink) newConn(connId uint32) *Conn { func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl, c := &Conn{link: nl,
connId: connId, connId: connId,
rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf rxq: make(chan *PktBuf, 1), // NOTE non-blocking - see serveRecv XXX +buf
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
...@@ -382,37 +381,7 @@ func (c *Conn) CloseRecv() { ...@@ -382,37 +381,7 @@ func (c *Conn) CloseRecv() {
// //
// It is safe to call Close several times. // It is safe to call Close several times.
func (c *Conn) Close() error { func (c *Conn) Close() error {
nl := c.nodeLink nl := c.link
/*
// adjust nodeLink.connTab
nl.connMu.Lock()
if nl.connTab != nil {
// connection was initiated by us - simply delete - we always
// know if a packet comes to such connection it is closed.
if c.connId == nl.nextConnId % 2 {
delete(nl.connTab, c.connId)
// connection was initiated by peer which we accepted - put special
// "closed" connection into connTab entry for some time to reply
// "connection closed" if another packet comes to it.
} else {
// XXX we do not need to create new connection - enough to put our
// connection into proper state and delete it after some time - right?
cc := nl.newConn(c.connId)
cc.shutdownRX(errConnClosed)
time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock()
delete(nl.connTab, cc.connId)
nl.connMu.Unlock()
cc.shutdown()
})
}
}
nl.connMu.Unlock()
*/
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
atomic.StoreInt32(&c.rxclosed, 1) atomic.StoreInt32(&c.rxclosed, 1)
atomic.StoreInt32(&c.txclosed, 1) atomic.StoreInt32(&c.txclosed, 1)
...@@ -500,7 +469,7 @@ func (c *Conn) errRecvShutdown() error { ...@@ -500,7 +469,7 @@ func (c *Conn) errRecvShutdown() error {
case atomic.LoadInt32(&c.rxclosed) != 0: case atomic.LoadInt32(&c.rxclosed) != 0:
return ErrClosedConn return ErrClosedConn
case atomic.LoadInt32(&c.nodeLink.closed) != 0: case atomic.LoadInt32(&c.link.closed) != 0:
return ErrLinkClosed return ErrLinkClosed
default: default:
...@@ -509,9 +478,9 @@ func (c *Conn) errRecvShutdown() error { ...@@ -509,9 +478,9 @@ func (c *Conn) errRecvShutdown() error {
// tell client the node link is no longer operational. // tell client the node link is no longer operational.
var err error var err error
c.rxerrOnce.Do(func() { c.rxerrOnce.Do(func() {
c.nodeLink.errMu.Lock() c.link.errMu.Lock()
err = c.nodeLink.errRecv err = c.link.errRecv
c.nodeLink.errMu.Unlock() c.link.errMu.Unlock()
}) })
if err == nil { if err == nil {
err = ErrLinkDown err = ErrLinkDown
...@@ -727,7 +696,7 @@ func (c *Conn) errSendShutdown() error { ...@@ -727,7 +696,7 @@ func (c *Conn) errSendShutdown() error {
// NodeLink was closed/shutdowned itself - on actual IO problems corresponding // NodeLink was closed/shutdowned itself - on actual IO problems corresponding
// error is delivered to particular Send that caused it. // error is delivered to particular Send that caused it.
case atomic.LoadInt32(&c.nodeLink.closed) != 0: case atomic.LoadInt32(&c.link.closed) != 0:
return ErrLinkClosed return ErrLinkClosed
default: default:
...@@ -750,12 +719,12 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error { ...@@ -750,12 +719,12 @@ func (c *Conn) sendPkt2(pkt *PktBuf) error {
case <-c.txdown: case <-c.txdown:
return c.errSendShutdown() return c.errSendShutdown()
case c.nodeLink.txq <- txReq{pkt, c.txerr}: case c.link.txq <- txReq{pkt, c.txerr}:
select { select {
// tx request was sent to serveSend and is being transmitted on the wire. // tx request was sent to serveSend and is being transmitted on the wire.
// the transmission may block for indefinitely long though and // the transmission may block for indefinitely long though and
// we cannot interrupt it as the only way to interrupt is // we cannot interrupt it as the only way to interrupt is
// .nodeLink.Close() which will close all other Conns. // .link.Close() which will close all other Conns.
// //
// That's why we are also checking for c.txdown while waiting // That's why we are also checking for c.txdown while waiting
// for reply from serveSend (and leave pkt to finish transmitting). // for reply from serveSend (and leave pkt to finish transmitting).
...@@ -1140,7 +1109,7 @@ func (nl *NodeLink) RemoteAddr() net.Addr { ...@@ -1140,7 +1109,7 @@ func (nl *NodeLink) RemoteAddr() net.Addr {
// Link returns underlying NodeLink of this connection. // Link returns underlying NodeLink of this connection.
func (c *Conn) Link() *NodeLink { func (c *Conn) Link() *NodeLink {
return c.nodeLink return c.link
} }
// ConnID returns connection identifier used for the connection. // ConnID returns connection identifier used for the connection.
...@@ -1157,7 +1126,7 @@ func (nl *NodeLink) String() string { ...@@ -1157,7 +1126,7 @@ func (nl *NodeLink) String() string {
} }
func (c *Conn) String() string { func (c *Conn) String() string {
s := fmt.Sprintf("%s .%d", c.nodeLink, c.connId) s := fmt.Sprintf("%s .%d", c.link, c.connId)
return s // XXX add "(closed)" if c is closed ? return s // XXX add "(closed)" if c is closed ?
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment