Commit d129e07f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d29c0d3b
......@@ -220,7 +220,7 @@ func (nl *NodeLink) newConn(connId uint32) *Conn {
c := &Conn{nodeLink: nl,
connId: connId,
rxq: make(chan *PktBuf),
txerr: make(chan error),
txerr: make(chan error, 1), // NOTE non-blocking - see Conn.Send
closed: make(chan struct{}),
}
nl.connTab[connId] = c
......@@ -341,15 +341,43 @@ var ErrClosedConn = errors.New("read/write on closed connection")
func (c *Conn) Send(pkt *PktBuf) error {
// set pkt connId associated with this connection
pkt.Header().ConnId = hton32(c.connId)
var err error
select {
case <-c.closed:
return ErrClosedConn
case c.nodeLink.txreq <- txReq{pkt, c.txerr}:
err := <-c.txerr
return err // XXX adjust err with c?
select {
// tx request was sent to serveSend and is being transmitted on the wire.
// the transmission may block for indefinitely long though and
// we cannot interrupt it as the only way to interrup is
// .nodeLink.Close() which will close all other Conns.
//
// That's why we are also checking for c.closed while waiting
// for reply from serveSend (and leave pkt to finish transmitting).
//
// NOTE after we return straight here serveSend won't be blocked on
// c.txerr<- because that backchannel is a non-blocking one.
case <-c.closed:
return ErrClosedConn
case err = <-c.txerr:
}
}
// if we got transmission error maybe it was due to underlying NodeLink
// being closed. If our Conn was also requested to be closed adjust err
// to ErrClosedConn along the way.
if err != nil {
select {
case <-c.closed:
err = ErrClosedConn
default:
}
}
return err
}
// Receive packet from connection
......
......@@ -131,42 +131,22 @@ func tdelay() {
time.Sleep(1*time.Millisecond)
}
func _nodeLinkPipe(flags ConnRole) (nl1, nl2 *NodeLink) {
func _nodeLinkPipe(flags1, flags2 ConnRole) (nl1, nl2 *NodeLink) {
node1, node2 := net.Pipe()
nl1 = NewNodeLink(node1, ConnClient | flags)
nl2 = NewNodeLink(node2, ConnServer | flags)
nl1 = NewNodeLink(node1, ConnClient | flags1)
nl2 = NewNodeLink(node2, ConnServer | flags2)
return nl1, nl2
}
// create NodeLinks connected via net.Pipe
func nodeLinkPipe() (nl1, nl2 *NodeLink) {
return _nodeLinkPipe(0)
}
func TestPipe(t *testing.T) {
p1, p2 := net.Pipe()
wg := WorkGroup()
wg.Gox(func() {
tdelay()
println("aaa")
xclose(p1)
println("bbb")
})
println("111")
_, err := p1.Write([]byte("data"))
println("222")
if err != io.ErrClosedPipe {
t.Fatalf("unexpected err = %v", err)
}
xwait(wg)
xclose(p2)
return _nodeLinkPipe(0, 0)
}
func TestNodeLink(t *testing.T) {
// TODO catch exception -> add proper location from it -> t.Fatal (see git-backup)
// Close vs recvPkt
println("111")
nl1, nl2 := _nodeLinkPipe(connNoRecvSend)
nl1, nl2 := _nodeLinkPipe(connNoRecvSend, connNoRecvSend)
wg := WorkGroup()
wg.Gox(func() {
tdelay()
......@@ -180,29 +160,22 @@ func TestNodeLink(t *testing.T) {
xclose(nl2)
// Close vs sendPkt
println("222")
nl1, nl2 = _nodeLinkPipe(connNoRecvSend)
nl1, nl2 = _nodeLinkPipe(connNoRecvSend, connNoRecvSend)
wg = WorkGroup()
wg.Gox(func() {
tdelay()
println("close(nl1) aaa")
xclose(nl1)
println("close(nl1) bbb")
})
pkt = &PktBuf{[]byte("data")}
println("zzz")
err = nl1.sendPkt(pkt)
println("qqq")
if err != io.ErrClosedPipe {
t.Fatalf("NodeLink.sendPkt() after close: err = %v", err)
}
xwait(wg)
xclose(nl2)
return
// check raw exchange works
println("333")
nl1, nl2 = _nodeLinkPipe(connNoRecvSend)
nl1, nl2 = _nodeLinkPipe(connNoRecvSend, connNoRecvSend)
wg, ctx := WorkGroupCtx(context.Background())
wg.Gox(func() {
......@@ -233,10 +206,9 @@ func TestNodeLink(t *testing.T) {
// Test connections on top of nodelink
println("444")
nl1, nl2 = nodeLinkPipe()
// Close vs Recv
nl1, nl2 = _nodeLinkPipe(0, connNoRecvSend)
c := nl1.NewConn()
wg = WorkGroup()
wg.Gox(func() {
......@@ -248,12 +220,11 @@ func TestNodeLink(t *testing.T) {
t.Fatalf("Conn.Recv() after close: pkt = %v err = %v", pkt, err)
}
xwait(wg)
xclose(nl1)
xclose(nl2)
// Close vs Send
println("444.2")
//xclose(nl2) // so it does not receive what nl1 sends XXX wrong ->
//this will just make Send return error right after call
// TODO what is needed is nl2.serveRecv shutdown
nl1, nl2 = _nodeLinkPipe(0, connNoRecvSend)
c = nl1.NewConn()
wg = WorkGroup()
wg.Gox(func() {
......@@ -267,6 +238,8 @@ func TestNodeLink(t *testing.T) {
}
xwait(wg)
return
// NodeLink.Close vs Conn.Send/Recv
c11 := nl1.NewConn()
c12 := nl1.NewConn()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment