Commit 4ed82496 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent da2025b4
...@@ -98,7 +98,7 @@ type Conn struct { ...@@ -98,7 +98,7 @@ type Conn struct {
rxdownFlag atomic32 // 1 when RX is marked no longer operational rxdownFlag atomic32 // 1 when RX is marked no longer operational
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light? rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light?
errMsg *Error // error message for peer if rx is down XXX try to do without it // errMsg *Error // error message for peer if rx is down XXX try to do without it
// there are two modes a Conn could be used: // there are two modes a Conn could be used:
// - full mode - where full Conn functionality is working, and // - full mode - where full Conn functionality is working, and
...@@ -237,7 +237,7 @@ func (c *Conn) reinit() { ...@@ -237,7 +237,7 @@ func (c *Conn) reinit() {
c.rxdownFlag.Set(0) // ----//---- c.rxdownFlag.Set(0) // ----//----
c.rxerrOnce = sync.Once{} // XXX ok? c.rxerrOnce = sync.Once{} // XXX ok?
c.errMsg = nil // XXX what here? // c.errMsg = nil // XXX what here?
// XXX vvv not strictly needed for light mode? // XXX vvv not strictly needed for light mode?
...@@ -413,12 +413,13 @@ func (c *Conn) shutdownRX(errMsg *Error) { ...@@ -413,12 +413,13 @@ func (c *Conn) shutdownRX(errMsg *Error) {
// downRX marks .rxq as no longer operational. // downRX marks .rxq as no longer operational.
func (c *Conn) downRX(errMsg *Error) { func (c *Conn) downRX(errMsg *Error) {
c.errMsg = errMsg // c.errMsg = errMsg
c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down? c.rxdownFlag.Set(1) // XXX cmpxchg and return if already down?
// dequeue all packets already queued in c.rxq // dequeue all packets already queued in c.rxq
// (once serveRecv sees c.rxdown it won't try to put new packets into // (once serveRecv sees c.rxdownFlag it won't try to put new packets into
// c.rxq, but something finite could be already there) // c.rxq, but something finite could be already there)
// XXX also describe race we avoid with rxdownFlag / rxqActive dance
i := 0 i := 0
loop: loop:
for { for {
...@@ -435,7 +436,7 @@ loop: ...@@ -435,7 +436,7 @@ loop:
// if something was queued already there - reply "connection closed" // if something was queued already there - reply "connection closed"
if i != 0 { if i != 0 {
go c.replyNoConn() go c.link.replyNoConn(c.connId, errMsg)
} }
} }
...@@ -635,28 +636,24 @@ func (nl *NodeLink) serveRecv() { ...@@ -635,28 +636,24 @@ func (nl *NodeLink) serveRecv() {
// resetting it waits for us to finish. // resetting it waits for us to finish.
conn := nl.connTab[connId] conn := nl.connTab[connId]
tmpclosed := false
if conn == nil { if conn == nil {
// "new" connection will be needed in all cases - e.g.
// even temporarily to reply "connection refused"
conn = nl.newConn(connId)
// message with connid that should be initiated by us
if connId % 2 == nl.nextConnId % 2 {
tmpclosed = true
delete(nl.connTab, conn.connId)
// message with connid for a stream initiated by peer // message with connid for a stream initiated by peer
// it will be considered to be accepted (not if .axdown) // it will be considered to be accepted (not if .axdown)
} else { if connId % 2 != nl.nextConnId % 2 {
accept = true accept = true
conn = nl.newConn(connId)
} }
// else it is message with connid that should be initiated by us
// leave conn=nil - we'll reply errConnClosed
} }
nl.connMu.Unlock() nl.connMu.Unlock()
if tmpclosed { if conn == nil {
conn.shutdownRX(errConnClosed) // see ^^^ "message with connid that should be initiated by us"
go nl.replyNoConn(connId, errConnClosed)
continue
} }
// route packet to serving goroutine handler // route packet to serving goroutine handler
...@@ -674,49 +671,14 @@ func (nl *NodeLink) serveRecv() { ...@@ -674,49 +671,14 @@ func (nl *NodeLink) serveRecv() {
} }
conn.rxqActive.Set(0) conn.rxqActive.Set(0)
/*
// XXX goes away in favour of .rxdownFlag; reasons
// - no need to reallocate rxdown for light conn
// - no select
// don't even try `conn.rxq <- ...` if conn.rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv
// could receive something on rxdown Conn sometimes )
rxdown := false
select {
case <-conn.rxdown:
rxdown = true
default:
// ok
}
// route packet to serving goroutine handler
//
// TODO backpressure when Recv is not keeping up with Send on peer side?
// (not to let whole link starve because of one connection)
//
// NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet
// there in acceptq.
if !rxdown {
// XXX can avoid select here: if conn closer cares to drain rxq (?)
select {
case <-conn.rxdown:
rxdown = true
case conn.rxq <- pkt: // conn exists but rx is down - "connection closed"
// ok // (this cannot happen for newly accepted connection)
}
}
*/
// we are not accepting packet in any way
if rxdown { if rxdown {
go conn.replyNoConn() go nl.replyNoConn(connId, errConnClosed)
continue continue
} }
// this packet established new connection - try to accept it // this packet established new connection - try to accept it
if accept { if accept {
// don't even try `link.acceptq <- ...` if link.axdown is ready // don't even try `link.acceptq <- ...` if link.axdown is ready
...@@ -751,6 +713,41 @@ func (nl *NodeLink) serveRecv() { ...@@ -751,6 +713,41 @@ func (nl *NodeLink) serveRecv() {
nl.connMu.Unlock() nl.connMu.Unlock()
} }
} }
/*
// XXX goes away in favour of .rxdownFlag; reasons
// - no need to reallocate rxdown for light conn
// - no select
// don't even try `conn.rxq <- ...` if conn.rxdown is ready
// ( else since select is picking random ready variant Recv/serveRecv
// could receive something on rxdown Conn sometimes )
rxdown := false
select {
case <-conn.rxdown:
rxdown = true
default:
// ok
}
// route packet to serving goroutine handler
//
// TODO backpressure when Recv is not keeping up with Send on peer side?
// (not to let whole link starve because of one connection)
//
// NOTE rxq must be buffered with at least 1 element so that
// queuing pkt succeeds for incoming connection that is not yet
// there in acceptq.
if !rxdown {
// XXX can avoid select here: if conn closer cares to drain rxq (?)
select {
case <-conn.rxdown:
rxdown = true
case conn.rxq <- pkt:
// ok
}
}
*/
} }
} }
...@@ -760,10 +757,10 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"} ...@@ -760,10 +757,10 @@ var errConnClosed = &Error{PROTOCOL_ERROR, "connection closed"}
var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"} var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection // replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
func (c *Conn) replyNoConn() { func (link *NodeLink) replyNoConn(connId uint32, errMsg Msg) {
//fmt.Printf("%v: -> replyNoConn %v\n", c, c.errMsg) //fmt.Printf("%s .%d: -> replyNoConn %v\n", link, connId, c.errMsg)
c.Send(c.errMsg) // ignore errors link.sendMsg(connId, errMsg) // ignore errors
//fmt.Printf("%v: replyNoConn(%v) -> %v\n", c, c.errMsg, err) //fmt.Printf("%s .%d: replyNoConn(%v) -> %v\n", link, connId, c.errMsg, err)
} }
// ---- transmit ---- // ---- transmit ----
...@@ -800,34 +797,12 @@ func (c *Conn) sendPkt(pkt *PktBuf) error { ...@@ -800,34 +797,12 @@ func (c *Conn) sendPkt(pkt *PktBuf) error {
return c.err("send", err) return c.err("send", err)
} }
// XXX serveSend is not needed - Conn.Write already can be used by multiple
// goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD)
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14
/*
func (c *Conn) sendPkt2(pkt *PktBuf) error { func (c *Conn) sendPkt2(pkt *PktBuf) error {
// set pkt connId associated with this connection // connId must be set to one associated with this connection
pkt.Header().ConnId = hton32(c.connId) if pkt.Header().ConnId != hton32(c.connId) {
panic("Conn.sendPkt: connId wrong")
// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
err := c.link.sendPkt(pkt)
//fmt.Printf("sendPkt -> %v\n", err)
// on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it.
if err != nil {
c.link.shutdown()
} }
return err
}
*/
///*
func (c *Conn) sendPkt2(pkt *PktBuf) error {
// set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId)
var err error var err error
select { select {
...@@ -885,6 +860,9 @@ func (nl *NodeLink) serveSend() { ...@@ -885,6 +860,9 @@ func (nl *NodeLink) serveSend() {
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it. // so we shut down node link and all connections over it.
//
// XXX dup wrt sendPktDirect
// XXX move to link.sendPkt?
if err != nil { if err != nil {
nl.shutdown() nl.shutdown()
return return
...@@ -892,7 +870,37 @@ func (nl *NodeLink) serveSend() { ...@@ -892,7 +870,37 @@ func (nl *NodeLink) serveSend() {
} }
} }
} }
//*/
// ---- transmit direct mode ----
// serveSend is not strictly needed - net.Conn.Write already can be used by multiple
// goroutines simultaneously and works atomically; (same for Conn.Read etc - see pool.FD)
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/net/net.go#L109
// https://github.com/golang/go/blob/go1.9-3-g099336988b/src/internal/poll/fd_unix.go#L14
//
// thus the only reason we use serveSend is so that Conn.Close can "interrupt" Conn.Send via .txdown .
// however this adds overhead and is not needed in light mode.
// sendPktDirect sends raw packet with appropriate connection ID directly via link.
func (c *Conn) sendPktDirect(pkt *PktBuf) error {
// set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId)
// XXX if n.peerLink was just closed by rx->shutdown we'll get ErrNetClosing
err := c.link.sendPkt(pkt)
//fmt.Printf("sendPkt -> %v\n", err)
// on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it.
//
// XXX dup wrt serveSend
// XXX move to link.sendPkt?
if err != nil {
c.link.shutdown()
}
return err
}
// ---- raw IO ---- // ---- raw IO ----
...@@ -1300,10 +1308,26 @@ func (c *Conn) err(op string, e error) error { ...@@ -1300,10 +1308,26 @@ func (c *Conn) err(op string, e error) error {
// ---- exchange of messages ---- // ---- exchange of messages ----
//trace:event traceConnRecv(c *Conn, msg Msg) //trace:event traceMsgRecv(c *Conn, msg Msg)
//trace:event traceConnSendPre(c *Conn, msg Msg) //trace:event traceMsgSendPre(l *NodeLink, connId uint32, msg Msg)
// XXX do we also need traceConnSend? // XXX do we also need traceConnSend?
// msgPack allocates PktBuf and encodes msg into it.
func msgPack(connId uint32, msg Msg) *PktBuf {
l := msg.neoMsgEncodedLen()
buf := pktAlloc(pktHeaderLen+l)
h := buf.Header()
h.ConnId = hton32(connId)
h.MsgCode = hton16(msg.neoMsgCode())
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
msg.neoMsgEncode(buf.Payload())
return buf
}
// TODO msgUnpack
// Recv receives message // Recv receives message
// it receives packet and decodes message from it // it receives packet and decodes message from it
func (c *Conn) Recv() (Msg, error) { func (c *Conn) Recv() (Msg, error) {
...@@ -1333,28 +1357,28 @@ func (c *Conn) Recv() (Msg, error) { ...@@ -1333,28 +1357,28 @@ func (c *Conn) Recv() (Msg, error) {
return nil, c.err("decode", err) // XXX "decode:" is already in ErrDecodeOverflow return nil, c.err("decode", err) // XXX "decode:" is already in ErrDecodeOverflow
} }
traceConnRecv(c, msg) traceMsgRecv(c, msg)
return msg, nil return msg, nil
} }
// Send sends message. // sendMsg sends message with specified connection ID.
// //
// it encodes message into packet and sends it // it encodes message int packet, sets header appropriately and sends it.
func (c *Conn) Send(msg Msg) error { func (link *NodeLink) sendMsg(connId uint32, msg Msg) error {
traceConnSendPre(c, msg) traceMsgSendPre(link, connId, msg)
l := msg.neoMsgEncodedLen()
buf := pktAlloc(pktHeaderLen+l)
h := buf.Header() buf := msgPack(connId, msg)
// h.ConnId will be set by conn.sendPkt return link.sendPkt(buf) // XXX more context in err? (msg type)
h.MsgCode = hton16(msg.neoMsgCode()) }
h.MsgLen = hton32(uint32(l)) // XXX casting: think again
msg.neoMsgEncode(buf.Payload()) // Send sends message.
//
// it encodes message into packet and sends it.
func (c *Conn) Send(msg Msg) error {
traceMsgSendPre(c.link, c.connId, msg)
// XXX more context in err? (msg type) buf := msgPack(c.connId, msg)
return c.sendPkt(buf) return c.sendPkt(buf) // XXX more context in err? (msg type)
} }
......
...@@ -37,6 +37,8 @@ import ( ...@@ -37,6 +37,8 @@ import (
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"github.com/pkg/errors" "github.com/pkg/errors"
//"runtime/debug"
) )
func xclose(c io.Closer) { func xclose(c io.Closer) {
...@@ -106,9 +108,9 @@ func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf { ...@@ -106,9 +108,9 @@ func _mkpkt(connid uint32, msgcode uint16, payload []byte) *PktBuf {
return pkt return pkt
} }
func mkpkt(msgcode uint16, payload []byte) *PktBuf { func (c *Conn) mkpkt(msgcode uint16, payload []byte) *PktBuf {
// in Conn exchange connid is automatically set by Conn.sendPkt // in Conn exchange connid is automatically set by Conn.sendPkt
return _mkpkt(0, msgcode, payload) return _mkpkt(c.connId, msgcode, payload)
} }
// Verify PktBuf is as expected // Verify PktBuf is as expected
...@@ -321,7 +323,7 @@ func TestNodeLink(t *testing.T) { ...@@ -321,7 +323,7 @@ func TestNodeLink(t *testing.T) {
tdelay() tdelay()
xclose(c) xclose(c)
}) })
pkt = &PktBuf{[]byte("data")} pkt = c.mkpkt(0, []byte("data"))
err = c.sendPkt(pkt) err = c.sendPkt(pkt)
if xconnError(err) != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt() after close: err = %v", err) t.Fatalf("Conn.sendPkt() after close: err = %v", err)
...@@ -341,7 +343,7 @@ func TestNodeLink(t *testing.T) { ...@@ -341,7 +343,7 @@ func TestNodeLink(t *testing.T) {
} }
}) })
wg.Gox(func() { wg.Gox(func() {
pkt := &PktBuf{[]byte("data")} pkt := c12.mkpkt(0, []byte("data"))
err := c12.sendPkt(pkt) err := c12.sendPkt(pkt)
if xconnError(err) != ErrLinkClosed { if xconnError(err) != ErrLinkClosed {
exc.Raisef("Conn.sendPkt() after NodeLink close: err = %v", err) exc.Raisef("Conn.sendPkt() after NodeLink close: err = %v", err)
...@@ -375,7 +377,7 @@ func TestNodeLink(t *testing.T) { ...@@ -375,7 +377,7 @@ func TestNodeLink(t *testing.T) {
errRecv = cerr errRecv = cerr
}) })
wg.Gox(func() { wg.Gox(func() {
pkt := &PktBuf{[]byte("data")} pkt := c22.mkpkt(0, []byte("data"))
err := c22.sendPkt(pkt) err := c22.sendPkt(pkt)
want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2 want := io.ErrClosedPipe // always this in both due to peer close or recvPkt waking up and closing nl2
if xconnError(err) != want { if xconnError(err) != want {
...@@ -413,7 +415,7 @@ func TestNodeLink(t *testing.T) { ...@@ -413,7 +415,7 @@ func TestNodeLink(t *testing.T) {
if !(pkt == nil && xconnError(err) == errRecv) { if !(pkt == nil && xconnError(err) == errRecv) {
t.Fatalf("Conn.recvPkt 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt 2 after peer NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c23.sendPkt(&PktBuf{[]byte("data")}) err = c23.sendPkt(c23.mkpkt(0, []byte("data")))
if xconnError(err) != ErrLinkDown { if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.sendPkt 2 after peer NodeLink shutdown: %v", err) t.Fatalf("Conn.sendPkt 2 after peer NodeLink shutdown: %v", err)
} }
...@@ -423,7 +425,7 @@ func TestNodeLink(t *testing.T) { ...@@ -423,7 +425,7 @@ func TestNodeLink(t *testing.T) {
if !(pkt == nil && xconnError(err) == ErrLinkDown) { if !(pkt == nil && xconnError(err) == ErrLinkDown) {
t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c22.sendPkt(&PktBuf{[]byte("data")}) err = c22.sendPkt(c22.mkpkt(0, []byte("data")))
if xconnError(err) != ErrLinkDown { if xconnError(err) != ErrLinkDown {
t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err) t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err)
} }
...@@ -434,7 +436,7 @@ func TestNodeLink(t *testing.T) { ...@@ -434,7 +436,7 @@ func TestNodeLink(t *testing.T) {
if !(pkt == nil && xconnError(err) == ErrClosedConn) { if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.recvPkt after close but only stopped NodeLink: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after close but only stopped NodeLink: pkt = %v err = %v", pkt, err)
} }
err = c23.sendPkt(&PktBuf{[]byte("data")}) err = c23.sendPkt(c23.mkpkt(0, []byte("data")))
if xconnError(err) != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt after close but only stopped NodeLink: %v", err) t.Fatalf("Conn.sendPkt after close but only stopped NodeLink: %v", err)
} }
...@@ -445,7 +447,7 @@ func TestNodeLink(t *testing.T) { ...@@ -445,7 +447,7 @@ func TestNodeLink(t *testing.T) {
if !(pkt == nil && xconnError(err) == ErrLinkClosed) { if !(pkt == nil && xconnError(err) == ErrLinkClosed) {
t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after NodeLink shutdown: pkt = %v err = %v", pkt, err)
} }
err = c22.sendPkt(&PktBuf{[]byte("data")}) err = c22.sendPkt(c22.mkpkt(0, []byte("data")))
if xconnError(err) != ErrLinkClosed { if xconnError(err) != ErrLinkClosed {
t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err) t.Fatalf("Conn.sendPkt after NodeLink shutdown: %v", err)
} }
...@@ -468,7 +470,7 @@ func TestNodeLink(t *testing.T) { ...@@ -468,7 +470,7 @@ func TestNodeLink(t *testing.T) {
if !(pkt == nil && xconnError(err) == ErrClosedConn) { if !(pkt == nil && xconnError(err) == ErrClosedConn) {
t.Fatalf("Conn.recvPkt after close and NodeLink close: pkt = %v err = %v", pkt, err) t.Fatalf("Conn.recvPkt after close and NodeLink close: pkt = %v err = %v", pkt, err)
} }
err = c22.sendPkt(&PktBuf{[]byte("data")}) err = c22.sendPkt(c22.mkpkt(0, []byte("data")))
if xconnError(err) != ErrClosedConn { if xconnError(err) != ErrClosedConn {
t.Fatalf("Conn.sendPkt after close and NodeLink close: %v", err) t.Fatalf("Conn.sendPkt after close and NodeLink close: %v", err)
} }
...@@ -492,12 +494,12 @@ func TestNodeLink(t *testing.T) { ...@@ -492,12 +494,12 @@ func TestNodeLink(t *testing.T) {
xverifyPkt(pkt, c.connId, 33, []byte("ping")) xverifyPkt(pkt, c.connId, 33, []byte("ping"))
// change pkt a bit and send it back // change pkt a bit and send it back
xsendPkt(c, mkpkt(34, []byte("pong"))) xsendPkt(c, c.mkpkt(34, []byte("pong")))
// one more time // one more time
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
xverifyPkt(pkt, c.connId, 35, []byte("ping2")) xverifyPkt(pkt, c.connId, 35, []byte("ping2"))
xsendPkt(c, mkpkt(36, []byte("pong2"))) xsendPkt(c, c.mkpkt(36, []byte("pong2")))
xclose(c) xclose(c)
closed <- 1 closed <- 1
...@@ -509,7 +511,7 @@ func TestNodeLink(t *testing.T) { ...@@ -509,7 +511,7 @@ func TestNodeLink(t *testing.T) {
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
//println("X ααα + 2") //println("X ααα + 2")
xverifyPkt(pkt, c2.connId, 41, []byte("ping5")) xverifyPkt(pkt, c2.connId, 41, []byte("ping5"))
xsendPkt(c2, mkpkt(42, []byte("pong5"))) xsendPkt(c2, c2.mkpkt(42, []byte("pong5")))
//println("X βββ") //println("X βββ")
c2.CloseRecv() c2.CloseRecv()
...@@ -519,12 +521,12 @@ func TestNodeLink(t *testing.T) { ...@@ -519,12 +521,12 @@ func TestNodeLink(t *testing.T) {
// "connection refused" when trying to connect to not-listening peer // "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here? c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, mkpkt(38, []byte("pong3"))) xsendPkt(c, c.mkpkt(38, []byte("pong3")))
//println("X γγγ + 1") //println("X γγγ + 1")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 2") //println("X γγγ + 2")
xverifyPktMsg(pkt, c.connId, errConnRefused) xverifyPktMsg(pkt, c.connId, errConnRefused)
xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again xsendPkt(c, c.mkpkt(40, []byte("pong4"))) // once again
//println("X γγγ + 3") //println("X γγγ + 3")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 4") //println("X γγγ + 4")
...@@ -539,10 +541,10 @@ func TestNodeLink(t *testing.T) { ...@@ -539,10 +541,10 @@ func TestNodeLink(t *testing.T) {
//println("aaa") //println("aaa")
c1 := xnewconn(nl1) c1 := xnewconn(nl1)
xsendPkt(c1, mkpkt(33, []byte("ping"))) xsendPkt(c1, c1.mkpkt(33, []byte("ping")))
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
xverifyPkt(pkt, c1.connId, 34, []byte("pong")) xverifyPkt(pkt, c1.connId, 34, []byte("pong"))
xsendPkt(c1, mkpkt(35, []byte("ping2"))) xsendPkt(c1, c1.mkpkt(35, []byte("ping2")))
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
xverifyPkt(pkt, c1.connId, 36, []byte("pong2")) xverifyPkt(pkt, c1.connId, 36, []byte("pong2"))
...@@ -550,12 +552,12 @@ func TestNodeLink(t *testing.T) { ...@@ -550,12 +552,12 @@ func TestNodeLink(t *testing.T) {
// "connection closed" after peer closed its end // "connection closed" after peer closed its end
<-closed <-closed
//println("111 + closed") //println("111 + closed")
xsendPkt(c1, mkpkt(37, []byte("ping3"))) xsendPkt(c1, c1.mkpkt(37, []byte("ping3")))
//println("111 + 1") //println("111 + 1")
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 2") //println("111 + 2")
xverifyPktMsg(pkt, c1.connId, errConnClosed) xverifyPktMsg(pkt, c1.connId, errConnClosed)
xsendPkt(c1, mkpkt(39, []byte("ping4"))) // once again xsendPkt(c1, c1.mkpkt(39, []byte("ping4"))) // once again
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 4") //println("111 + 4")
xverifyPktMsg(pkt, c1.connId, errConnClosed) xverifyPktMsg(pkt, c1.connId, errConnClosed)
...@@ -564,11 +566,11 @@ func TestNodeLink(t *testing.T) { ...@@ -564,11 +566,11 @@ func TestNodeLink(t *testing.T) {
//println("222") //println("222")
// one more time but now peer does only .CloseRecv() // one more time but now peer does only .CloseRecv()
c2 := xnewconn(nl1) c2 := xnewconn(nl1)
xsendPkt(c2, mkpkt(41, []byte("ping5"))) xsendPkt(c2, c2.mkpkt(41, []byte("ping5")))
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
xverifyPkt(pkt, c2.connId, 42, []byte("pong5")) xverifyPkt(pkt, c2.connId, 42, []byte("pong5"))
<-closed <-closed
xsendPkt(c2, mkpkt(41, []byte("ping6"))) xsendPkt(c2, c2.mkpkt(41, []byte("ping6")))
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
xverifyPktMsg(pkt, c2.connId, errConnClosed) xverifyPktMsg(pkt, c2.connId, errConnClosed)
...@@ -638,8 +640,8 @@ func TestNodeLink(t *testing.T) { ...@@ -638,8 +640,8 @@ func TestNodeLink(t *testing.T) {
c1 = xnewconn(nl1) c1 = xnewconn(nl1)
c2 = xnewconn(nl1) c2 = xnewconn(nl1)
xsendPkt(c1, mkpkt(1, []byte(""))) xsendPkt(c1, c1.mkpkt(1, []byte("")))
xsendPkt(c2, mkpkt(2, []byte(""))) xsendPkt(c2, c2.mkpkt(2, []byte("")))
// replies must be coming in reverse order // replies must be coming in reverse order
xechoWait := func(c *Conn, msgCode uint16) { xechoWait := func(c *Conn, msgCode uint16) {
...@@ -754,16 +756,56 @@ func xRecv1(l *NodeLink) Request { ...@@ -754,16 +756,56 @@ func xRecv1(l *NodeLink) Request {
return req return req
} }
func xSend1(l *NodeLink, msg Msg) {
err := l.Send1(msg)
exc.Raiseif(err)
}
func xverifyMsg(msg1, msg2 Msg) { func xverifyMsg(msg1, msg2 Msg) {
if !reflect.DeepEqual(msg1, msg2) { if !reflect.DeepEqual(msg1, msg2) {
//debug.PrintStack()
exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2)) exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2))
} }
} }
func TestRecv1Mode(t *testing.T) { func TestRecv1Mode(t *testing.T) {
// Recv1: further packets with same connid are rejected with "connection closed" println("000")
// Send1
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
wg := &xsync.WorkGroup{} wg := &xsync.WorkGroup{}
wg.Gox(func() {
c := xaccept(nl2)
msg := xRecv(c)
xverifyMsg(msg, &Ping{})
xSend(c, &Pong{})
msg = xRecv(c)
xverifyMsg(msg, errConnClosed)
xclose(c)
println("X zzz")
c = xaccept(nl2)
msg = xRecv(c)
println("X zzz + 1")
xverifyMsg(msg, &Error{ACK, "hello up there"})
xSend(c, &Error{ACK, "hello to you too"})
msg = xRecv(c)
println("X zzz + 2")
xverifyMsg(msg, errConnClosed)
println("X zzz + 3")
})
println("aaa")
xSend1(nl1, &Ping{})
println("bbb")
xSend1(nl1, &Error{ACK, "hello up there"})
println("ccc")
xwait(wg)
println("111")
// Recv1: further packets with same connid are rejected with "connection closed"
wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
c := xnewconn(nl2) c := xnewconn(nl2)
...@@ -775,6 +817,8 @@ func TestRecv1Mode(t *testing.T) { ...@@ -775,6 +817,8 @@ func TestRecv1Mode(t *testing.T) {
_ = xRecv1(nl1) _ = xRecv1(nl1)
xwait(wg) xwait(wg)
// TODO link.Close vs Recv1
} }
// ---- benchmarks ---- // ---- benchmarks ----
......
...@@ -35,57 +35,57 @@ func traceClusterStateChanged_Attach(pg *tracing.ProbeGroup, probe func(cs *Clus ...@@ -35,57 +35,57 @@ func traceClusterStateChanged_Attach(pg *tracing.ProbeGroup, probe func(cs *Clus
return &p.Probe return &p.Probe
} }
// traceevent: traceConnRecv(c *Conn, msg Msg) // traceevent: traceMsgRecv(c *Conn, msg Msg)
type _t_traceConnRecv struct { type _t_traceMsgRecv struct {
tracing.Probe tracing.Probe
probefunc func(c *Conn, msg Msg) probefunc func(c *Conn, msg Msg)
} }
var _traceConnRecv *_t_traceConnRecv var _traceMsgRecv *_t_traceMsgRecv
func traceConnRecv(c *Conn, msg Msg) { func traceMsgRecv(c *Conn, msg Msg) {
if _traceConnRecv != nil { if _traceMsgRecv != nil {
_traceConnRecv_run(c, msg) _traceMsgRecv_run(c, msg)
} }
} }
func _traceConnRecv_run(c *Conn, msg Msg) { func _traceMsgRecv_run(c *Conn, msg Msg) {
for p := _traceConnRecv; p != nil; p = (*_t_traceConnRecv)(unsafe.Pointer(p.Next())) { for p := _traceMsgRecv; p != nil; p = (*_t_traceMsgRecv)(unsafe.Pointer(p.Next())) {
p.probefunc(c, msg) p.probefunc(c, msg)
} }
} }
func traceConnRecv_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg Msg)) *tracing.Probe { func traceMsgRecv_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg Msg)) *tracing.Probe {
p := _t_traceConnRecv{probefunc: probe} p := _t_traceMsgRecv{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceConnRecv)), &p.Probe) tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMsgRecv)), &p.Probe)
return &p.Probe return &p.Probe
} }
// traceevent: traceConnSendPre(c *Conn, msg Msg) // traceevent: traceMsgSendPre(l *NodeLink, connId uint32, msg Msg)
type _t_traceConnSendPre struct { type _t_traceMsgSendPre struct {
tracing.Probe tracing.Probe
probefunc func(c *Conn, msg Msg) probefunc func(l *NodeLink, connId uint32, msg Msg)
} }
var _traceConnSendPre *_t_traceConnSendPre var _traceMsgSendPre *_t_traceMsgSendPre
func traceConnSendPre(c *Conn, msg Msg) { func traceMsgSendPre(l *NodeLink, connId uint32, msg Msg) {
if _traceConnSendPre != nil { if _traceMsgSendPre != nil {
_traceConnSendPre_run(c, msg) _traceMsgSendPre_run(l, connId, msg)
} }
} }
func _traceConnSendPre_run(c *Conn, msg Msg) { func _traceMsgSendPre_run(l *NodeLink, connId uint32, msg Msg) {
for p := _traceConnSendPre; p != nil; p = (*_t_traceConnSendPre)(unsafe.Pointer(p.Next())) { for p := _traceMsgSendPre; p != nil; p = (*_t_traceMsgSendPre)(unsafe.Pointer(p.Next())) {
p.probefunc(c, msg) p.probefunc(l, connId, msg)
} }
} }
func traceConnSendPre_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg Msg)) *tracing.Probe { func traceMsgSendPre_Attach(pg *tracing.ProbeGroup, probe func(l *NodeLink, connId uint32, msg Msg)) *tracing.Probe {
p := _t_traceConnSendPre{probefunc: probe} p := _t_traceMsgSendPre{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceConnSendPre)), &p.Probe) tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMsgSendPre)), &p.Probe)
return &p.Probe return &p.Probe
} }
...@@ -117,4 +117,4 @@ func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n ...@@ -117,4 +117,4 @@ func traceNodeChanged_Attach(pg *tracing.ProbeGroup, probe func(nt *NodeTable, n
} }
// trace export signature // trace export signature
func _trace_exporthash_ab325b43be064a06d1c80db96d5bf50678b5b037() {} func _trace_exporthash_933f43c04bbb1566c5d1e9ea518f9ed6e0f147a7() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment