Commit 91be5cdd authored by Kirill Smelkov's avatar Kirill Smelkov

X everyone is listening from start; CloseAccept to disable listening - works

parent 0a27363b
...@@ -104,9 +104,10 @@ type Conn struct { ...@@ -104,9 +104,10 @@ type Conn struct {
errMsg *Error // error message for peer if rx is down errMsg *Error // error message for peer if rx is down
// after Close Conn is kept for some time in link.connTab so peer could // closing Conn is shutdown + some cleanup work to remove it from
// receive "connection closed" and then GC'ed // link.connTab including arming timers etc. Let this work be spawned only once.
gcOnce sync.Once // (for Conn.Close to be valid called several times)
closeOnce sync.Once
} }
...@@ -412,13 +413,13 @@ func (c *Conn) Close() error { ...@@ -412,13 +413,13 @@ func (c *Conn) Close() error {
} }
nl.connMu.Unlock() nl.connMu.Unlock()
*/ */
c.closeOnce.Do(func() {
atomic.StoreInt32(&c.rxclosed, 1) atomic.StoreInt32(&c.rxclosed, 1)
atomic.StoreInt32(&c.txclosed, 1) atomic.StoreInt32(&c.txclosed, 1)
c.shutdown() c.shutdown()
// adjust link.connTab // adjust link.connTab
keep := false var tmpclosed *Conn
nl.connMu.Lock() nl.connMu.Lock()
if nl.connTab != nil { if nl.connTab != nil {
// connection was initiated by us - simply delete - we always // connection was initiated by us - simply delete - we always
...@@ -428,26 +429,29 @@ func (c *Conn) Close() error { ...@@ -428,26 +429,29 @@ func (c *Conn) Close() error {
if c.connId == nl.nextConnId % 2 { if c.connId == nl.nextConnId % 2 {
delete(nl.connTab, c.connId) delete(nl.connTab, c.connId)
// connection was initiated by peer which we accepted. // connection was initiated by peer which we accepted - put special
// it is already shutted down. // "closed" connection into connTab entry for some time to reply
// keep connTab entry for it for some time to reply
// "connection closed" if another packet comes to it. // "connection closed" if another packet comes to it.
//
// ( we cannot reuse same connection since after it is marked as
// closed Send refuses to work )
} else { } else {
keep = true // c implicitly goes away from connTab
tmpclosed = nl.newConn(c.connId)
} }
} }
nl.connMu.Unlock() nl.connMu.Unlock()
if keep { if tmpclosed != nil {
c.gcOnce.Do(func() { tmpclosed.shutdownRX(errConnClosed)
time.AfterFunc(connKeepClosed, func() { time.AfterFunc(connKeepClosed, func() {
nl.connMu.Lock() nl.connMu.Lock()
delete(nl.connTab, c.connId) delete(nl.connTab, c.connId)
nl.connMu.Unlock() nl.connMu.Unlock()
}) })
})
} }
})
return nil return nil
} }
...@@ -700,7 +704,9 @@ var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"} ...@@ -700,7 +704,9 @@ var errConnRefused = &Error{PROTOCOL_ERROR, "connection refused"}
// replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection // replyNoConn sends error message to peer when a packet was sent to closed / nonexistent connection
func (c *Conn) replyNoConn() { func (c *Conn) replyNoConn() {
//fmt.Printf("%v: -> replyNoConn %v\n", c, c.errMsg)
c.Send(c.errMsg) // ignore errors c.Send(c.errMsg) // ignore errors
//fmt.Printf("%v: replyNoConn(%v) -> %v\n", c, c.errMsg, err)
} }
// ---- transmit ---- // ---- transmit ----
......
...@@ -168,8 +168,6 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) { ...@@ -168,8 +168,6 @@ func nodeLinkPipe() (nl1, nl2 *NodeLink) {
func TestNodeLink(t *testing.T) { func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup) // TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
println("000")
// Close vs recvPkt // Close vs recvPkt
nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 := _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg := &xsync.WorkGroup{} wg := &xsync.WorkGroup{}
...@@ -184,8 +182,6 @@ func TestNodeLink(t *testing.T) { ...@@ -184,8 +182,6 @@ func TestNodeLink(t *testing.T) {
xwait(wg) xwait(wg)
xclose(nl2) xclose(nl2)
println("222")
// Close vs sendPkt // Close vs sendPkt
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
...@@ -208,12 +204,10 @@ func TestNodeLink(t *testing.T) { ...@@ -208,12 +204,10 @@ func TestNodeLink(t *testing.T) {
tdelay() tdelay()
xclose(nl2) xclose(nl2)
}) })
println("222 + 1")
c, err := nl2.Accept() c, err := nl2.Accept()
if !(c == nil && xlinkError(err) == ErrLinkClosed) { if !(c == nil && xlinkError(err) == ErrLinkClosed) {
t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err) t.Fatalf("NodeLink.Accept() after close: conn = %v, err = %v", c, err)
} }
println("222 + 2")
wg.Gox(func() { wg.Gox(func() {
tdelay() tdelay()
nl1.CloseAccept() nl1.CloseAccept()
...@@ -229,12 +223,8 @@ func TestNodeLink(t *testing.T) { ...@@ -229,12 +223,8 @@ func TestNodeLink(t *testing.T) {
if !(c == nil && xlinkError(err) == ErrLinkNoListen) { if !(c == nil && xlinkError(err) == ErrLinkNoListen) {
t.Fatalf("NodeLink.Accept() on non-listening node link: conn = %v, err = %v", c, err) t.Fatalf("NodeLink.Accept() on non-listening node link: conn = %v, err = %v", c, err)
} }
println("222 + 3")
xclose(nl1) xclose(nl1)
println("333")
// Close vs recvPkt on another side // Close vs recvPkt on another side
nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend) nl1, nl2 = _nodeLinkPipe(linkNoRecvSend, linkNoRecvSend)
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
...@@ -468,13 +458,12 @@ func TestNodeLink(t *testing.T) { ...@@ -468,13 +458,12 @@ func TestNodeLink(t *testing.T) {
} }
//println("\n---------------------\n")
saveKeepClosed := connKeepClosed saveKeepClosed := connKeepClosed
connKeepClosed = 10*time.Millisecond connKeepClosed = 10*time.Millisecond
// Conn accept + exchange // Conn accept + exchange
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
nl1.CloseAccept()
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
closed := make(chan int) closed := make(chan int)
wg.Gox(func() { wg.Gox(func() {
...@@ -494,27 +483,42 @@ func TestNodeLink(t *testing.T) { ...@@ -494,27 +483,42 @@ func TestNodeLink(t *testing.T) {
xclose(c) xclose(c)
closed <- 1 closed <- 1
//println("X ααα")
// once again as ^^^ but finish only with CloseRecv // once again as ^^^ but finish only with CloseRecv
c2 := xaccept(nl2) c2 := xaccept(nl2)
//println("X ααα + 1")
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
//println("X ααα + 2")
xverifyPkt(pkt, c2.connId, 41, []byte("ping5")) xverifyPkt(pkt, c2.connId, 41, []byte("ping5"))
xsendPkt(c2, mkpkt(42, []byte("pong5"))) xsendPkt(c2, mkpkt(42, []byte("pong5")))
//println("X βββ")
c2.CloseRecv() c2.CloseRecv()
closed <- 2 closed <- 2
//println("X γγγ")
// "connection refused" when trying to connect to not-listening peer // "connection refused" when trying to connect to not-listening peer
c = xnewconn(nl2) // XXX should get error here? c = xnewconn(nl2) // XXX should get error here?
xsendPkt(c, mkpkt(38, []byte("pong3"))) xsendPkt(c, mkpkt(38, []byte("pong3")))
//println("X γγγ + 1")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 2")
xverifyMsg(pkt, c.connId, errConnRefused) xverifyMsg(pkt, c.connId, errConnRefused)
xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again
//println("X γγγ + 3")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 4")
xverifyMsg(pkt, c.connId, errConnRefused) xverifyMsg(pkt, c.connId, errConnRefused)
//println("X zzz")
xclose(c) xclose(c)
}) })
//println("000")
c1 := xnewconn(nl1) c1 := xnewconn(nl1)
xsendPkt(c1, mkpkt(33, []byte("ping"))) xsendPkt(c1, mkpkt(33, []byte("ping")))
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
...@@ -523,16 +527,22 @@ func TestNodeLink(t *testing.T) { ...@@ -523,16 +527,22 @@ func TestNodeLink(t *testing.T) {
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
xverifyPkt(pkt, c1.connId, 36, []byte("pong2")) xverifyPkt(pkt, c1.connId, 36, []byte("pong2"))
//println("111")
// "connection closed" after peer closed its end // "connection closed" after peer closed its end
<-closed <-closed
//println("111 + closed")
xsendPkt(c1, mkpkt(37, []byte("ping3"))) xsendPkt(c1, mkpkt(37, []byte("ping3")))
//println("111 + 1")
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 2")
xverifyMsg(pkt, c1.connId, errConnClosed) xverifyMsg(pkt, c1.connId, errConnClosed)
xsendPkt(c1, mkpkt(39, []byte("ping4"))) // once again xsendPkt(c1, mkpkt(39, []byte("ping4"))) // once again
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 4")
xverifyMsg(pkt, c1.connId, errConnClosed) xverifyMsg(pkt, c1.connId, errConnClosed)
// XXX also should get EOF on recv // XXX also should get EOF on recv
//println("222")
// one more time but now peer does only .CloseRecv() // one more time but now peer does only .CloseRecv()
c2 := xnewconn(nl1) c2 := xnewconn(nl1)
xsendPkt(c2, mkpkt(41, []byte("ping5"))) xsendPkt(c2, mkpkt(41, []byte("ping5")))
...@@ -543,7 +553,9 @@ func TestNodeLink(t *testing.T) { ...@@ -543,7 +553,9 @@ func TestNodeLink(t *testing.T) {
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
xverifyMsg(pkt, c2.connId, errConnClosed) xverifyMsg(pkt, c2.connId, errConnClosed)
//println("333 z")
xwait(wg) xwait(wg)
//println("444")
// make sure entry for closed nl2.1 stays in nl2.connTab // make sure entry for closed nl2.1 stays in nl2.connTab
nl2.connMu.Lock() nl2.connMu.Lock()
...@@ -560,12 +572,16 @@ func TestNodeLink(t *testing.T) { ...@@ -560,12 +572,16 @@ func TestNodeLink(t *testing.T) {
} }
nl2.connMu.Unlock() nl2.connMu.Unlock()
//println("555")
xclose(c1) xclose(c1)
xclose(c2) xclose(c2)
xclose(nl1) xclose(nl1)
xclose(nl2) xclose(nl2)
connKeepClosed = saveKeepClosed connKeepClosed = saveKeepClosed
//println("\nsss")
// test 2 channels with replies coming in reversed time order // test 2 channels with replies coming in reversed time order
nl1, nl2 = nodeLinkPipe() nl1, nl2 = nodeLinkPipe()
wg = &xsync.WorkGroup{} wg = &xsync.WorkGroup{}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment