Commit d38c5c77 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9fa79958
...@@ -99,13 +99,8 @@ type Conn struct { ...@@ -99,13 +99,8 @@ type Conn struct {
rxqActive int32 // atomic: 1 while serveRecv is doing `rxq <- ...` rxqActive int32 // atomic: 1 while serveRecv is doing `rxq <- ...`
rxdownFlag int32 // atomic: 1 when RX is marked no longer operational rxdownFlag int32 // atomic: 1 when RX is marked no longer operational
// !light only rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed XXX !light?
rxdown chan struct{} // ready when RX is marked no longer operational errMsg *Error // error message for peer if rx is down XXX try to do without it
rxdownOnce sync.Once // ----//---- XXX review
rxclosed int32 // whether CloseRecv was called
rxerrOnce sync.Once // rx error is reported only once - then it is link down or closed
errMsg *Error // error message for peer if rx is down
txerr chan error // transmit results for this Conn go back here txerr chan error // transmit results for this Conn go back here
...@@ -113,6 +108,21 @@ type Conn struct { ...@@ -113,6 +108,21 @@ type Conn struct {
txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown txdownOnce sync.Once // tx shutdown may be called by both Close and nodelink.shutdown
txclosed int32 // whether CloseSend was called txclosed int32 // whether CloseSend was called
// there are two modes a Conn could be used:
// - full mode - where full Conn functionality is working, and
// - light mode - where only subset functionality is working
//
// the light mode is used to implement Recv1 & friends - there any
// connection is used max to send and/or receive only 1 packet and then
// has to be reused for efficiency ideally without reallocating anything.
//
// everything below is used during !light mode only.
rxdown chan struct{} // ready when RX is marked no longer operational
rxdownOnce sync.Once // ----//---- XXX review
rxclosed int32 // whether CloseRecv was called
// closing Conn is shutdown + some cleanup work to remove it from // closing Conn is shutdown + some cleanup work to remove it from
// link.connTab including arming timers etc. Let this work be spawned only once. // link.connTab including arming timers etc. Let this work be spawned only once.
// (for Conn.Close to be valid called several times) // (for Conn.Close to be valid called several times)
...@@ -340,15 +350,14 @@ func (c *Conn) shutdownTX() { ...@@ -340,15 +350,14 @@ func (c *Conn) shutdownTX() {
// shutdownRX marks .rxq as no loner operational and interrupts Recv. // shutdownRX marks .rxq as no loner operational and interrupts Recv.
func (c *Conn) shutdownRX(errMsg *Error) { func (c *Conn) shutdownRX(errMsg *Error) {
c.rxdownOnce.Do(func() { c.rxdownOnce.Do(func() {
c.errMsg = errMsg
close(c.rxdown) // wakeup Conn.Recv close(c.rxdown) // wakeup Conn.Recv
c.downRX(errMsg)
c.downRX()
}) })
} }
// downRX marks .rxq as no longer operational // downRX marks .rxq as no longer operational.
func (c *Conn) downRX() { func (c *Conn) downRX(errMsg *Error) {
c.errMsg = errMsg
atomic.StoreInt32(&c.rxdownFlag, 1) atomic.StoreInt32(&c.rxdownFlag, 1)
// dequeue all packets already queued in c.rxq // dequeue all packets already queued in c.rxq
...@@ -1365,10 +1374,10 @@ func (link *NodeLink) Recv1() (Request, error) { ...@@ -1365,10 +1374,10 @@ func (link *NodeLink) Recv1() (Request, error) {
return Request{}, err return Request{}, err
} }
// noone will be reading from conn anymore - shutdown rx so that if // noone will be reading from conn anymore - mark rx down so that if
// peer sends any another packet with same .ConnID serveRecv does not // peer sends any another packet with same .ConnID serveRecv does not
// deadlock trying to put it to conn.rxq. // deadlock trying to put it to conn.rxq.
conn.CloseRecv() conn.downRX(errConnClosed)
return Request{Msg: msg, conn: conn}, nil return Request{Msg: msg, conn: conn}, nil
} }
......
...@@ -25,6 +25,7 @@ import ( ...@@ -25,6 +25,7 @@ import (
"context" "context"
"io" "io"
"net" "net"
"reflect"
"testing" "testing"
"time" "time"
...@@ -133,7 +134,7 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) { ...@@ -133,7 +134,7 @@ func xverifyPkt(pkt *PktBuf, connid uint32, msgcode uint16, payload []byte) {
} }
// Verify PktBuf to match expected message // Verify PktBuf to match expected message
func xverifyMsg(pkt *PktBuf, connid uint32, msg Msg) { func xverifyPktMsg(pkt *PktBuf, connid uint32, msg Msg) {
data := make([]byte, msg.neoMsgEncodedLen()) data := make([]byte, msg.neoMsgEncodedLen())
msg.neoMsgEncode(data) msg.neoMsgEncode(data)
xverifyPkt(pkt, connid, msg.neoMsgCode(), data) xverifyPkt(pkt, connid, msg.neoMsgCode(), data)
...@@ -522,12 +523,12 @@ func TestNodeLink(t *testing.T) { ...@@ -522,12 +523,12 @@ func TestNodeLink(t *testing.T) {
//println("X γγγ + 1") //println("X γγγ + 1")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 2") //println("X γγγ + 2")
xverifyMsg(pkt, c.connId, errConnRefused) xverifyPktMsg(pkt, c.connId, errConnRefused)
xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again xsendPkt(c, mkpkt(40, []byte("pong4"))) // once again
//println("X γγγ + 3") //println("X γγγ + 3")
pkt = xrecvPkt(c) pkt = xrecvPkt(c)
//println("X γγγ + 4") //println("X γγγ + 4")
xverifyMsg(pkt, c.connId, errConnRefused) xverifyPktMsg(pkt, c.connId, errConnRefused)
//println("X zzz") //println("X zzz")
...@@ -553,11 +554,11 @@ func TestNodeLink(t *testing.T) { ...@@ -553,11 +554,11 @@ func TestNodeLink(t *testing.T) {
//println("111 + 1") //println("111 + 1")
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 2") //println("111 + 2")
xverifyMsg(pkt, c1.connId, errConnClosed) xverifyPktMsg(pkt, c1.connId, errConnClosed)
xsendPkt(c1, mkpkt(39, []byte("ping4"))) // once again xsendPkt(c1, mkpkt(39, []byte("ping4"))) // once again
pkt = xrecvPkt(c1) pkt = xrecvPkt(c1)
//println("111 + 4") //println("111 + 4")
xverifyMsg(pkt, c1.connId, errConnClosed) xverifyPktMsg(pkt, c1.connId, errConnClosed)
// XXX also should get EOF on recv // XXX also should get EOF on recv
//println("222") //println("222")
...@@ -569,7 +570,7 @@ func TestNodeLink(t *testing.T) { ...@@ -569,7 +570,7 @@ func TestNodeLink(t *testing.T) {
<-closed <-closed
xsendPkt(c2, mkpkt(41, []byte("ping6"))) xsendPkt(c2, mkpkt(41, []byte("ping6")))
pkt = xrecvPkt(c2) pkt = xrecvPkt(c2)
xverifyMsg(pkt, c2.connId, errConnClosed) xverifyPktMsg(pkt, c2.connId, errConnClosed)
//println("333 z") //println("333 z")
xwait(wg) xwait(wg)
...@@ -734,6 +735,48 @@ func TestHandshake(t *testing.T) { ...@@ -734,6 +735,48 @@ func TestHandshake(t *testing.T) {
} }
// ---- recv1 mode ----
func xSend(c *Conn, msg Msg) {
err := c.Send(msg)
exc.Raiseif(err)
}
func xRecv(c *Conn) Msg {
msg, err := c.Recv()
exc.Raiseif(err)
return msg
}
func xRecv1(l *NodeLink) Request {
req, err := l.Recv1()
exc.Raiseif(err)
return req
}
func xverifyMsg(msg1, msg2 Msg) {
if !reflect.DeepEqual(msg1, msg2) {
exc.Raisef("messages differ:\n%s", pretty.Compare(msg1, msg2))
}
}
func TestRecv1Mode(t *testing.T) {
// Recv1: further packets with same connid are rejected with "connection closed"
nl1, nl2 := nodeLinkPipe()
wg := &xsync.WorkGroup{}
wg.Gox(func() {
c := xnewconn(nl2)
xSend(c, &Ping{})
xSend(c, &Ping{})
msg := xRecv(c)
xverifyMsg(msg, errConnClosed)
})
_ = xRecv1(nl1)
xwait(wg)
}
// ---- benchmarks ---- // ---- benchmarks ----
// rtt over chan - for comparision as base // rtt over chan - for comparision as base
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment