Commit e055406a authored by Kirill Smelkov's avatar Kirill Smelkov

X no select for acceptq - similarly for rxq path

- BenchmarkLinkNetPipeRTT-4         500000              3555 ns/op             225 B/op          5 allocs/op
+ BenchmarkLinkNetPipeRTT-4         500000              3189 ns/op             225 B/op          5 allocs/op
parent c28ad4d0
...@@ -62,11 +62,15 @@ type NodeLink struct { ...@@ -62,11 +62,15 @@ type NodeLink struct {
nextConnId uint32 // next connId to use for Conn initiated by us nextConnId uint32 // next connId to use for Conn initiated by us
serveWg sync.WaitGroup // for serve{Send,Recv} serveWg sync.WaitGroup // for serve{Send,Recv}
acceptq chan *Conn // queue of incoming connections for Accept
txq chan txReq // tx requests from Conns go via here txq chan txReq // tx requests from Conns go via here
// (rx packets are routed to Conn.rxq) // (rx packets are routed to Conn.rxq)
axdown chan struct{} // ready when accept is marked as no longer operational acceptq chan *Conn // queue of incoming connections for Accept
axqWrite atomic32 // 1 while serveRecv is doing `acceptq <- ...`
axqRead atomic32 // +1 while Accept is doing `... <- acceptq`
axdownFlag atomic32 // 1 when AX is marked no longer operational
// axdown chan struct{} // ready when accept is marked as no longer operational
axdown1 sync.Once // CloseAccept may be called severall times axdown1 sync.Once // CloseAccept may be called severall times
down chan struct{} // ready when NodeLink is marked as no longer operational down chan struct{} // ready when NodeLink is marked as no longer operational
...@@ -192,7 +196,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink { ...@@ -192,7 +196,7 @@ func newNodeLink(conn net.Conn, role LinkRole) *NodeLink {
nextConnId: nextConnId, nextConnId: nextConnId,
acceptq: make(chan *Conn), // XXX +buf acceptq: make(chan *Conn), // XXX +buf
txq: make(chan txReq), txq: make(chan txReq),
axdown: make(chan struct{}), // axdown: make(chan struct{}),
down: make(chan struct{}), down: make(chan struct{}),
} }
if role&linkNoRecvSend == 0 { if role&linkNoRecvSend == 0 {
...@@ -308,13 +312,22 @@ func (nl *NodeLink) NewConn() (*Conn, error) { ...@@ -308,13 +312,22 @@ func (nl *NodeLink) NewConn() (*Conn, error) {
// shutdownAX marks acceptq as no longer operational and interrupts Accept. // shutdownAX marks acceptq as no longer operational and interrupts Accept.
func (link *NodeLink) shutdownAX() { func (link *NodeLink) shutdownAX() {
link.axdown1.Do(func() { link.axdown1.Do(func() {
close(link.axdown) // close(link.axdown)
link.axdownFlag.Set(1) // XXX cmpxchg and return if already down?
// drain all connections from .acceptq:
// - something could be already buffered there
// - serveRecv could start writing acceptq at the same time we set axdownFlag; we derace it
// dequeue all connections already queued in link.acceptq
// (once serveRecv sees link.axdown it won't try to put new connections into
// link.acceptq, but something finite could be already there)
loop:
for { for {
// if serveRecv is outside `.acceptq <- ...` critical
// region and fully drained - we are done.
// see description of the logic in shutdownRX
if link.axqWrite.Get() == 0 && len(link.acceptq) == 0 {
break
}
select { select {
case conn := <-link.acceptq: case conn := <-link.acceptq:
// serveRecv already put at least 1 packet into conn.rxq before putting // serveRecv already put at least 1 packet into conn.rxq before putting
...@@ -326,7 +339,24 @@ loop: ...@@ -326,7 +339,24 @@ loop:
link.connMu.Unlock() link.connMu.Unlock()
default: default:
break loop // ok - continue spinning
}
}
// wakeup Accepts
for {
// similarly to above: .axdownFlag vs .axqRead
// see logic description in shutdownRX
if link.axqRead.Get() == 0 {
break
}
select {
case link.acceptq <- nil:
// ok - woken up
default:
// ok - continue spinning
} }
} }
}) })
...@@ -450,7 +480,7 @@ func (c *Conn) downRX(errMsg *Error) { ...@@ -450,7 +480,7 @@ func (c *Conn) downRX(errMsg *Error) {
go c.link.replyNoConn(c.connId, errMsg) go c.link.replyNoConn(c.connId, errMsg)
} }
// wakeup readers // wakeup recvPkt(s)
for { for {
// similarly to above: // similarly to above:
// we set .rxdownFlag=1 // we set .rxdownFlag=1
...@@ -562,24 +592,36 @@ func (link *NodeLink) errAcceptShutdownAX() error { ...@@ -562,24 +592,36 @@ func (link *NodeLink) errAcceptShutdownAX() error {
} }
// Accept waits for and accepts incoming connection on top of node-node link. // Accept waits for and accepts incoming connection on top of node-node link.
func (nl *NodeLink) Accept(/*ctx context.Context*/) (*Conn, error) { func (nl *NodeLink) Accept() (*Conn, error) {
select { // semantically equivalent to the following:
case <-nl.axdown: // ( this is hot path for py compatibility mode because new connection
return nil, nl.err("accept", nl.errAcceptShutdownAX()) // is established in every message and select hurts performance )
//
// select {
// case <-nl.axdown:
// return nil, nl.err("accept", nl.errAcceptShutdownAX())
//
// case c := <-nl.acceptq:
// return c, nil
// }
case c := <-nl.acceptq: var conn *Conn
return c, nil var err error
// XXX for long-lived links - better to propagate ctx cancel to link.Close to nl.axqRead.Add(1)
// lower cases that are run at every select. axdown := nl.axdownFlag.Get() != 0
// if !axdown {
// XXX see xio.CloseWhenDone() for helper for this. conn = <-nl.acceptq
/*
// XXX ctx cancel tests
case <-ctx.Done():
return nil, ctx.Err()
*/
} }
nl.axqRead.Add(-1)
// in contrast to recvPkt we can decide about error after releasing axqRead
// reason: link is not going to be released to a free pool.
if axdown || conn == nil {
err = nl.err("accept", nl.errAcceptShutdownAX())
}
return conn, err
} }
// errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt // errRecvShutdown returns appropriate error when c.rxdown is found ready in recvPkt
...@@ -610,9 +652,6 @@ func (c *Conn) errRecvShutdown() error { ...@@ -610,9 +652,6 @@ func (c *Conn) errRecvShutdown() error {
// recvPkt receives raw packet from connection // recvPkt receives raw packet from connection
func (c *Conn) recvPkt() (*PktBuf, error) { func (c *Conn) recvPkt() (*PktBuf, error) {
var pkt *PktBuf
var err error
// semantically equivalent to the following: // semantically equivalent to the following:
// (this is hot path and select is not used for performance reason) // (this is hot path and select is not used for performance reason)
// //
...@@ -624,19 +663,21 @@ func (c *Conn) recvPkt() (*PktBuf, error) { ...@@ -624,19 +663,21 @@ func (c *Conn) recvPkt() (*PktBuf, error) {
// return pkt, nil // return pkt, nil
// } // }
var pkt *PktBuf
var err error
c.rxqRead.Add(1) c.rxqRead.Add(1)
rxdown := c.rxdownFlag.Get() != 0 rxdown := c.rxdownFlag.Get() != 0
if !rxdown { if !rxdown {
pkt = <-c.rxq pkt = <-c.rxq
} }
// decide about error while under rxqRead - if it was after the Conn can go away to be released // decide about error while under rxqRead (if after - the Conn can go away to be released)
if rxdown || pkt == nil { if rxdown || pkt == nil {
err = c.err("recv", c.errRecvShutdown()) err = c.err("recv", c.errRecvShutdown())
} }
c.rxqRead.Add(-1) c.rxqRead.Add(-1)
return pkt, err return pkt, err
} }
...@@ -733,34 +774,16 @@ func (nl *NodeLink) serveRecv() { ...@@ -733,34 +774,16 @@ func (nl *NodeLink) serveRecv() {
// this packet established new connection - try to accept it // this packet established new connection - try to accept it
if accept { if accept {
// don't even try `link.acceptq <- ...` if link.axdown is ready nl.axqWrite.Set(1)
// ( else since select is picking random ready variant Accept/serveRecv axdown := nl.axdownFlag.Get() != 0
// could receive something on axdown Link sometimes )
axdown := false
select {
case <-nl.axdown:
axdown = true
default:
// ok
}
// put conn to .acceptq
if !axdown { if !axdown {
// XXX can avoid select here if shutdownAX cares to drain acceptq (?) nl.acceptq <- conn
select {
case <-nl.axdown:
axdown = true
case nl.acceptq <- conn:
//fmt.Printf("%p\t.acceptq <- conn ok\n", nl)
// ok
}
} }
nl.axqWrite.Set(0)
// we are not accepting the connection // we are not accepting the connection
if axdown { if axdown {
conn.shutdownRX(errConnRefused) go nl.replyNoConn(connId, errConnRefused)
nl.connMu.Lock() nl.connMu.Lock()
delete(nl.connTab, conn.connId) delete(nl.connTab, conn.connId)
nl.connMu.Unlock() nl.connMu.Unlock()
...@@ -802,6 +825,44 @@ func (nl *NodeLink) serveRecv() { ...@@ -802,6 +825,44 @@ func (nl *NodeLink) serveRecv() {
// ok // ok
} }
} }
...
// this packet established new connection - try to accept it
if accept {
// don't even try `link.acceptq <- ...` if link.axdown is ready
// ( else since select is picking random ready variant Accept/serveRecv
// could receive something on axdown Link sometimes )
axdown := false
select {
case <-nl.axdown:
axdown = true
default:
// ok
}
// put conn to .acceptq
if !axdown {
// XXX can avoid select here if shutdownAX cares to drain acceptq (?)
select {
case <-nl.axdown:
axdown = true
case nl.acceptq <- conn:
//fmt.Printf("%p\t.acceptq <- conn ok\n", nl)
// ok
}
}
// we are not accepting the connection
if axdown {
conn.shutdownRX(errConnRefused)
nl.connMu.Lock()
delete(nl.connTab, conn.connId)
nl.connMu.Unlock()
}
}
*/ */
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment