Commit dba74d2d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4ed82496
...@@ -613,7 +613,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -613,7 +613,7 @@ func (nl *NodeLink) serveRecv() {
// receive 1 packet // receive 1 packet
// XXX if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing // XXX if nl.peerLink was just closed by tx->shutdown we'll get ErrNetClosing
pkt, err := nl.recvPkt() pkt, err := nl.recvPkt()
//fmt.Printf("recvPkt -> %v, %v\n", pkt, err) //fmt.Printf("\n%p recvPkt -> %v, %v\n", nl, pkt, err)
if err != nil { if err != nil {
// on IO error framing over peerLink becomes broken // on IO error framing over peerLink becomes broken
// so we shut down node link and all connections over it. // so we shut down node link and all connections over it.
...@@ -650,6 +650,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -650,6 +650,7 @@ func (nl *NodeLink) serveRecv() {
nl.connMu.Unlock() nl.connMu.Unlock()
//fmt.Printf("%p\tconn: %v\n", nl, conn)
if conn == nil { if conn == nil {
// see ^^^ "message with connid that should be initiated by us" // see ^^^ "message with connid that should be initiated by us"
go nl.replyNoConn(connId, errConnClosed) go nl.replyNoConn(connId, errConnClosed)
...@@ -671,6 +672,8 @@ func (nl *NodeLink) serveRecv() { ...@@ -671,6 +672,8 @@ func (nl *NodeLink) serveRecv() {
} }
conn.rxqActive.Set(0) conn.rxqActive.Set(0)
//fmt.Printf("%p\tconn.rxdown: %v\taccept: %v\n", nl, rxdown, accept)
// conn exists but rx is down - "connection closed" // conn exists but rx is down - "connection closed"
// (this cannot happen for newly accepted connection) // (this cannot happen for newly accepted connection)
...@@ -701,6 +704,7 @@ func (nl *NodeLink) serveRecv() { ...@@ -701,6 +704,7 @@ func (nl *NodeLink) serveRecv() {
axdown = true axdown = true
case nl.acceptq <- conn: case nl.acceptq <- conn:
//fmt.Printf("%p\t.acceptq <- conn ok\n", nl)
// ok // ok
} }
} }
...@@ -713,6 +717,8 @@ func (nl *NodeLink) serveRecv() { ...@@ -713,6 +717,8 @@ func (nl *NodeLink) serveRecv() {
nl.connMu.Unlock() nl.connMu.Unlock()
} }
} }
//fmt.Printf("%p\tafter accept\n", nl)
/* /*
// XXX goes away in favour of .rxdownFlag; reasons // XXX goes away in favour of .rxdownFlag; reasons
// - no need to reallocate rxdown for light conn // - no need to reallocate rxdown for light conn
...@@ -758,9 +764,9 @@ var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"} ...@@ -758,9 +764,9 @@ var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection // replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
func (link *NodeLink) replyNoConn(connId uint32, errMsg Msg) { func (link *NodeLink) replyNoConn(connId uint32, errMsg Msg) {
//fmt.Printf("%s .%d: -> replyNoConn %v\n", link, connId, c.errMsg) //fmt.Printf("%s .%d: -> replyNoConn %v\n", link, connId, errMsg)
link.sendMsg(connId, errMsg) // ignore errors link.sendMsg(connId, errMsg) // ignore errors
//fmt.Printf("%s .%d: replyNoConn(%v) -> %v\n", link, connId, c.errMsg, err) //fmt.Printf("%s .%d: replyNoConn(%v) -> %v\n", link, connId, errMsg, err)
} }
// ---- transmit ---- // ---- transmit ----
...@@ -1513,7 +1519,8 @@ func (link *NodeLink) Send1(msg Msg) error { ...@@ -1513,7 +1519,8 @@ func (link *NodeLink) Send1(msg Msg) error {
conn.downRX(errConnClosed) // FIXME just new conn this way conn.downRX(errConnClosed) // FIXME just new conn this way
err = conn.Send(msg) err = conn.Send(msg)
conn.release() //conn.release() XXX temp
conn.Close()
return err return err
} }
......
...@@ -37,8 +37,6 @@ import ( ...@@ -37,8 +37,6 @@ import (
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
"github.com/pkg/errors" "github.com/pkg/errors"
//"runtime/debug"
) )
func xclose(c io.Closer) { func xclose(c io.Closer) {
...@@ -763,55 +761,77 @@ func xSend1(l *NodeLink, msg Msg) { ...@@ -763,55 +761,77 @@ func xSend1(l *NodeLink, msg Msg) {
func xverifyMsg(msg1, msg2 Msg) { func xverifyMsg(msg1, msg2 Msg) {
if !reflect.DeepEqual(msg1, msg2) { if !reflect.DeepEqual(msg1, msg2) {
//debug.PrintStack()
exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2)) exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2))
} }
} }
func TestRecv1Mode(t *testing.T) { func TestRecv1Mode(t *testing.T) {
println("000") //println("000")
// Send1 // Send1
nl1, nl2 := nodeLinkPipe() nl1, nl2 := nodeLinkPipe()
wg := &xsync.WorkGroup{} wg := &xsync.WorkGroup{}
sync := make(chan int)
wg.Gox(func() { wg.Gox(func() {
defer func() {
if e := recover(); e != nil {
panic(e)
}
}()
//println("X aaa")
c := xaccept(nl2) c := xaccept(nl2)
//println("X aaa + 1")
msg := xRecv(c) msg := xRecv(c)
//println("X aaa + 2")
xverifyMsg(msg, &Ping{}) xverifyMsg(msg, &Ping{})
xSend(c, &Pong{}) xSend(c, &Pong{})
//println("X aaa + 3")
msg = xRecv(c) msg = xRecv(c)
//println("X aaa + 4")
xverifyMsg(msg, errConnClosed) xverifyMsg(msg, errConnClosed)
xclose(c) xclose(c)
println("X zzz") sync <- 1
//println("X zzz")
c = xaccept(nl2) c = xaccept(nl2)
msg = xRecv(c) msg = xRecv(c)
println("X zzz + 1") //fmt.Println("X zzz + 1", c, msg)
xverifyMsg(msg, &Error{ACK, "hello up there"}) xverifyMsg(msg, &Error{ACK, "hello up there"})
xSend(c, &Error{ACK, "hello to you too"}) xSend(c, &Error{ACK, "hello to you too"})
msg = xRecv(c) msg = xRecv(c)
println("X zzz + 2") //println("X zzz + 2")
xverifyMsg(msg, errConnClosed) xverifyMsg(msg, errConnClosed)
println("X zzz + 3") //println("X zzz + 3")
xclose(c)
}) })
println("aaa") //println("aaa")
xSend1(nl1, &Ping{}) xSend1(nl1, &Ping{})
println("bbb")
// before next Send1 wait till peer receives errConnClosed from us
// otherwise peer could be already in accept while our errConnClosed is received
// and there is only one receiving thread there ^^^
<-sync
//println("bbb")
xSend1(nl1, &Error{ACK, "hello up there"}) xSend1(nl1, &Error{ACK, "hello up there"})
println("ccc") //println("ccc")
xwait(wg) xwait(wg)
println("111") //println("111\n")
// Recv1: further packets with same connid are rejected with "connection closed" // Recv1: further packets with same connid are rejected with "connection closed"
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
wg.Gox(func() { wg.Gox(func() {
c := xnewconn(nl2) c := xnewconn(nl2)
//println("aaa")
xSend(c, &Ping{}) xSend(c, &Ping{})
//println("bbb")
xSend(c, &Ping{}) xSend(c, &Ping{})
//println("ccc")
msg := xRecv(c) msg := xRecv(c)
//println("ddd")
xverifyMsg(msg, errConnClosed) xverifyMsg(msg, errConnClosed)
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment