Commit d39a6fc3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7dcbc9c5
...@@ -410,24 +410,34 @@ func (c *Conn) shutdownRX(errMsg *Error) { ...@@ -410,24 +410,34 @@ func (c *Conn) shutdownRX(errMsg *Error) {
} }
// downRX marks .rxq as no longer operational. // downRX marks .rxq as no longer operational.
//
// used in shutdownRX and separately to mark RX down for light Conns.
func (c *Conn) downRX(errMsg *Error) { func (c *Conn) downRX(errMsg *Error) {
// let serveRecv know RX is down for this connection
c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down? c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down?
// dequeue all packets already queued in c.rxq // drain all packets from .rxq:
// (once serveRecv sees c.rxdownFlag it won't try to put new packets into // - something could be already buffered there
// c.rxq, but something finite could be already there) // - serveRecv could start writing rxq at the same time we set rxdownFlag; we derace it.
// XXX also describe race we avoid with rxdownFlag / rxqActive dance
i := 0 i := 0
loop:
for { for {
// we set .rxdownFlag=1 above.
// now if serveRecv is outside `.rxq <- ...` critical section we know it is either:
// - before it -> it will eventually see .rxdownFlag=1 and wont send pkt ro rxq.
// - after it -> it already sent pkt to rxq and won't touch
// rxq until next packet (where it will hit "before it").
//
// when serveRecv stopped sending we know we are done draining when rxq is empty.
if c.rxqActive.Get() == 0 && len(c.rxq) == 0 {
break
}
select { select {
case <-c.rxq: case <-c.rxq:
i++ i++
default: default:
if c.rxqActive.Get() == 0 && len(c.rxq) == 0 { // ok - continue spinning
break loop
}
} }
} }
...@@ -1204,32 +1214,6 @@ func (l *linkListener) Addr() net.Addr { ...@@ -1204,32 +1214,6 @@ func (l *linkListener) Addr() net.Addr {
return l.l.Addr() return l.l.Addr()
} }
/*
XXX do if this is needed in a second place besides talkMaster1
// ---- Listen for single Conn over NodeLink use-cases ----
// XXX
func ListenSingleConn(link *NodeLink) ConnListener {
l := &listen1conn{link}
// XXX go ...
return l
}
// ConnListener XXX ...
type ConnListener interface {
// XXX +Close, Addr ?
// Accept returns new connection multiplexed over NodeLink
Accept() (*Conn, error)
}
type listen1conn struct {
link *NodeLink
}
func ...
*/
// ---- for convenience: Conn -> NodeLink & local/remote link addresses ---- // ---- for convenience: Conn -> NodeLink & local/remote link addresses ----
...@@ -1350,6 +1334,8 @@ func (c *Conn) Recv() (Msg, error) { ...@@ -1350,6 +1334,8 @@ func (c *Conn) Recv() (Msg, error) {
// sendMsg sends message with specified connection ID. // sendMsg sends message with specified connection ID.
// //
// it encodes message int packet, sets header appropriately and sends it. // it encodes message int packet, sets header appropriately and sends it.
//
// it is ok to call sendMsg in parallel with serveSend running XXX why
func (link *NodeLink) sendMsg(connId uint32, msg Msg) error { func (link *NodeLink) sendMsg(connId uint32, msg Msg) error {
traceMsgSendPre(link, connId, msg) traceMsgSendPre(link, connId, msg)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment