Commit 9fa79958 authored by Kirill Smelkov's avatar Kirill Smelkov

X draft how to mark RX down without reallocating .rxdown

automatically removes select serveRecv.
parent 5b61fa33
...@@ -94,20 +94,25 @@ type NodeLink struct { ...@@ -94,20 +94,25 @@ type NodeLink struct {
type Conn struct { type Conn struct {
link *NodeLink link *NodeLink
connId uint32 connId uint32
rxq chan *PktBuf // received packets for this Conn go here
rxq chan *PktBuf // received packets for this Conn go here
rxqActive int32 // atomic: 1 while serveRecv is doing `rxq <- ...`
rxdownFlag int32 // atomic: 1 when RX is marked no longer operational
// !light only
rxdown chan struct{} // ready when RX is marked no longer operational
rxdownOnce sync.Once // ----//---- XXX review
rxclosed int32 // whether CloseRecv was called
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
errMsg *Error // error message for peer if rx is down
txerr chan error // transmit results for this Conn go back here txerr chan error // transmit results for this Conn go back here
txdown chan struct{} // ready when Conn TX is marked as no longer operational txdown chan struct{} // ready when Conn TX is marked as no longer operational
rxdown chan struct{} // ----//---- RX
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
rxdownOnce sync.Once // ----//----
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
rxclosed int32 // whether CloseRecv was called
txclosed int32 // whether CloseSend was called txclosed int32 // whether CloseSend was called
errMsg *Error // error message for peer if rx is down
// closing Conn is shutdown + some cleanup work to remove it from // closing Conn is shutdown + some cleanup work to remove it from
// link.connTab including arming timers etc. Let this work be spawned only once. // link.connTab including arming timers etc. Let this work be spawned only once.
// (for Conn.Close to be valid called several times) // (for Conn.Close to be valid called several times)
...@@ -130,7 +135,7 @@ type LinkError struct { ...@@ -130,7 +135,7 @@ type LinkError struct {
// ConnError is returned by Conn operations // ConnError is returned by Conn operations
type ConnError struct { type ConnError struct {
Conn *Conn Conn *Conn // XXX Conn's are reused -> connId/link explicitly?
Op string Op string
Err error Err error
} }
...@@ -320,7 +325,7 @@ func (nl *NodeLink) Close() error { ...@@ -320,7 +325,7 @@ func (nl *NodeLink) Close() error {
return nl.err("close", nl.errClose) return nl.err("close", nl.errClose)
} }
// shutdown marks connection as no longer operational // shutdown marks connection as no longer operational and interrupts Send and Recv.
func (c *Conn) shutdown() { func (c *Conn) shutdown() {
c.shutdownTX() c.shutdownTX()
c.shutdownRX(errConnClosed) c.shutdownRX(errConnClosed)
...@@ -332,32 +337,41 @@ func (c *Conn) shutdownTX() { ...@@ -332,32 +337,41 @@ func (c *Conn) shutdownTX() {
}) })
} }
// shutdownRX marks .rxq as no loner operational // shutdownRX marks .rxq as no loner operational and interrupts Recv.
func (c *Conn) shutdownRX(errMsg *Error) { func (c *Conn) shutdownRX(errMsg *Error) {
c.rxdownOnce.Do(func() { c.rxdownOnce.Do(func() {
c.errMsg = errMsg c.errMsg = errMsg
close(c.rxdown) close(c.rxdown) // wakeup Conn.Recv
// dequeue all packets already queued in c.rxq c.downRX()
// (once serveRecv sees c.rxdown it won't try to put new packets into })
// c.rxq, but something finite could be already there) }
i := 0
// downRX marks .rxq as no longer operational
func (c *Conn) downRX() {
atomic.StoreInt32(&c.rxdownFlag, 1)
// dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into
// c.rxq, but something finite could be already there)
i := 0
loop: loop:
for { for {
select { select {
case <-c.rxq: case <-c.rxq:
i++ i++
default: default:
if atomic.LoadInt32(&c.rxqActive) == 0 && len(c.rxq) == 0 {
break loop break loop
} }
} }
}
// if something was queued already there - reply "connection closed" // if something was queued already there - reply "connection closed"
if i != 0 { if i != 0 {
go c.replyNoConn() go c.replyNoConn()
} }
})
} }
...@@ -563,6 +577,26 @@ func (nl *NodeLink) serveRecv() { ...@@ -563,6 +577,26 @@ func (nl *NodeLink) serveRecv() {
conn.shutdownRX(errConnClosed) conn.shutdownRX(errConnClosed)
} }
// route packet to serving goroutine handler
//
// TODO backpressure when Recv is not keeping up with Send on peer side?
// (not to let whole link starve because of one connection)
//
// NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet
// there in acceptq.
atomic.StoreInt32(&conn.rxqActive, 1)
rxdown := atomic.LoadInt32(&conn.rxdownFlag) != 0
if !rxdown {
conn.rxq <- pkt
}
atomic.StoreInt32(&conn.rxqActive, 0)
/*
// XXX goes away in favour of .rxdownFlag; reasons
// - no need to reallocate rxdown for light conn
// - no select
// don't even try `conn.rxq <- ...` if conn.rxdown is ready // don't even try `conn.rxq <- ...` if conn.rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv // ( else since select is picking random ready variant Recv/serveRecv
// could receive something on rxdown Conn sometimes ) // could receive something on rxdown Conn sometimes )
...@@ -592,6 +626,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -592,6 +626,7 @@ func (nl *NodeLink) serveRecv() {
// ok // ok
} }
} }
*/
// we are not accepting packet in any way // we are not accepting packet in any way
if rxdown { if rxdown {
...@@ -1356,7 +1391,7 @@ func (req *Request) Close() error { ...@@ -1356,7 +1391,7 @@ func (req *Request) Close() error {
} }
// Send1 sends one message over link // Send1 sends one message over link.
// //
// XXX doc // XXX doc
func (link *NodeLink) Send1(msg Msg) error { func (link *NodeLink) Send1(msg Msg) error {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment