Commit b50ecee2 authored by Kirill Smelkov's avatar Kirill Smelkov

*: Adapt code to xnet.Networker changes for Listen/Accept to handle cancellation

In the previous patch xnet.Networker was changed: Listen now accepts ctx
and returns xnet.Listener instead of net.Listener.

-> Adapt the code all around to that.
parent 3354b401
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -30,22 +30,23 @@ import ( ...@@ -30,22 +30,23 @@ import (
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/internal/xtesting" "lab.nexedi.com/kirr/go123/internal/xtesting"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/virtnet" "lab.nexedi.com/kirr/go123/xnet/virtnet"
) )
type mklistener interface { type mklistener interface {
Listen(string) (net.Listener, error) Listen(context.Context, string) (xnet.Listener, error)
} }
func xlisten(n mklistener, laddr string) net.Listener { func xlisten(ctx context.Context, n mklistener, laddr string) xnet.Listener {
l, err := n.Listen(laddr) l, err := n.Listen(ctx, laddr)
exc.Raiseif(err) exc.Raiseif(err)
return l return l
} }
func xaccept(l net.Listener) net.Conn { func xaccept(ctx context.Context, l xnet.Listener) net.Conn {
c, err := l.Accept() c, err := l.Accept(ctx)
exc.Raiseif(err) exc.Raiseif(err)
return c return c
} }
...@@ -108,7 +109,7 @@ func TestBasic(t *testing.T, subnet *virtnet.SubNetwork) { ...@@ -108,7 +109,7 @@ func TestBasic(t *testing.T, subnet *virtnet.SubNetwork) {
_, err = .Dial(ctx, ":0") _, err = .Dial(ctx, ":0")
assert.Eq(err, &net.OpError{Op: "dial", Net: subnet.Network(), Source: xaddr("α:1"), Addr: xaddr("α:0"), Err: virtnet.ErrConnRefused}) assert.Eq(err, &net.OpError{Op: "dial", Net: subnet.Network(), Source: xaddr("α:1"), Addr: xaddr("α:0"), Err: virtnet.ErrConnRefused})
l1, err := .Listen("") l1, err := .Listen(ctx, "")
X(err) X(err)
assert.Eq(l1.Addr(), xaddr("α:1")) assert.Eq(l1.Addr(), xaddr("α:1"))
...@@ -118,14 +119,14 @@ func TestBasic(t *testing.T, subnet *virtnet.SubNetwork) { ...@@ -118,14 +119,14 @@ func TestBasic(t *testing.T, subnet *virtnet.SubNetwork) {
wg := &errgroup.Group{} wg := &errgroup.Group{}
wg.Go(exc.Funcx(func() { wg.Go(exc.Funcx(func() {
c1s := xaccept(l1) c1s := xaccept(ctx, l1)
assert.Eq(c1s.LocalAddr(), xaddr("α:2")) assert.Eq(c1s.LocalAddr(), xaddr("α:2"))
assert.Eq(c1s.RemoteAddr(), xaddr("β:1")) assert.Eq(c1s.RemoteAddr(), xaddr("β:1"))
assert.Eq(xread(c1s), "ping") // XXX for !pipe could read less assert.Eq(xread(c1s), "ping") // XXX for !pipe could read less
xwrite(c1s, "pong") xwrite(c1s, "pong")
c2s := xaccept(l1) c2s := xaccept(ctx, l1)
assert.Eq(c2s.LocalAddr(), xaddr("α:3")) assert.Eq(c2s.LocalAddr(), xaddr("α:3"))
assert.Eq(c2s.RemoteAddr(), xaddr("β:2")) assert.Eq(c2s.RemoteAddr(), xaddr("β:2"))
...@@ -149,6 +150,6 @@ func TestBasic(t *testing.T, subnet *virtnet.SubNetwork) { ...@@ -149,6 +150,6 @@ func TestBasic(t *testing.T, subnet *virtnet.SubNetwork) {
xwait(wg) xwait(wg)
l2 := xlisten(, ":0") // autobind again l2 := xlisten(ctx, , ":0") // autobind again
assert.Eq(l2.Addr(), xaddr("α:4")) assert.Eq(l2.Addr(), xaddr("α:4"))
} }
// Copyright (C) 2018 Nexedi SA and Contributors. // Copyright (C) 2018-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -48,11 +48,11 @@ ...@@ -48,11 +48,11 @@
// hβ, err := net.NewHost(ctx, "β") // hβ, err := net.NewHost(ctx, "β")
// //
// // starts listening on address "α:10" // // starts listening on address "α:10"
// l, err := hα.Listen(":10") // l, err := hα.Listen(ctx, ":10")
// go func() { // go func() {
// csrv, err := l.Accept() // csrv will have LocalAddr "α:1" // csrv, err := l.Accept(ctx) // csrv will have LocalAddr "α:1"
// }() // }()
// ccli, err := hβ.Dial(ctx, "α:10") // ccli will be connection between "β:1" - "α:1" // ccli, err := hβ.Dial(ctx, "α:10") // ccli will be connection between "β:1" - "α:1"
// //
// Once again lonet is similar to pipenet, but since it works via OS TCP stack // Once again lonet is similar to pipenet, but since it works via OS TCP stack
// it could be handy for testing networked application when there are several // it could be handy for testing networked application when there are several
...@@ -141,7 +141,7 @@ type subNetwork struct { ...@@ -141,7 +141,7 @@ type subNetwork struct {
// OS-level listener of this subnetwork. // OS-level listener of this subnetwork.
// whenever connection to subnet's host is tried to be established it goes here. // whenever connection to subnet's host is tried to be established it goes here.
oslistener net.Listener oslistener xnet.Listener
// accepted connections are further routed here for virtnet to handle. // accepted connections are further routed here for virtnet to handle.
vnotify virtnet.Notifier vnotify virtnet.Notifier
...@@ -197,7 +197,7 @@ func Join(ctx context.Context, network string) (_ *virtnet.SubNetwork, err error ...@@ -197,7 +197,7 @@ func Join(ctx context.Context, network string) (_ *virtnet.SubNetwork, err error
} }
// start OS listener // start OS listener
oslistener, err := tcp4.Listen("127.0.0.1:") oslistener, err := tcp4.Listen(ctx, "127.0.0.1:")
if err != nil { if err != nil {
registry.Close() registry.Close()
return nil, err return nil, err
...@@ -238,7 +238,7 @@ func (n *subNetwork) serve(ctx context.Context) { ...@@ -238,7 +238,7 @@ func (n *subNetwork) serve(ctx context.Context) {
// wait for incoming OS connections and do lonet protocol handshake on them. // wait for incoming OS connections and do lonet protocol handshake on them.
// if successful - route handshaked connection to particular Host's listener. // if successful - route handshaked connection to particular Host's listener.
for { for {
osconn, err := n.oslistener.Accept() osconn, err := n.oslistener.Accept(ctx)
if err != nil { if err != nil {
// mark subnetwork as being down and stop // mark subnetwork as being down and stop
n.vnotify.VNetDown(err) n.vnotify.VNetDown(err)
......
// Copyright (C) 2018 Nexedi SA and Contributors. // Copyright (C) 2018-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -70,11 +70,11 @@ func TestLonetGoPy(t *testing.T) { ...@@ -70,11 +70,11 @@ func TestLonetGoPy(t *testing.T) {
} }
, err := subnet.NewHost(bg, "α"); X(err) , err := subnet.NewHost(bg, "α"); X(err)
, err := .Listen(":1"); X(err) , err := .Listen(bg, ":1"); X(err)
wg := &errgroup.Group{} wg := &errgroup.Group{}
wg.Go(exc.Funcx(func() { wg.Go(exc.Funcx(func() {
c1, err := .Accept(); X(err) c1, err := .Accept(bg); X(err)
assert.Eq(c1.LocalAddr(), xaddr("α:2")) assert.Eq(c1.LocalAddr(), xaddr("α:2"))
assert.Eq(c1.RemoteAddr(), xaddr("β:2")) assert.Eq(c1.RemoteAddr(), xaddr("β:2"))
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -28,9 +28,9 @@ ...@@ -28,9 +28,9 @@
// h1 := net.Host("abc") // h1 := net.Host("abc")
// h2 := net.Host("def") // h2 := net.Host("def")
// //
// l, err := h1.Listen(":10") // starts listening on address "abc:10" // l, err := h1.Listen(ctx, ":10") // starts listening on address "abc:10"
// go func() { // go func() {
// csrv, err := l.Accept() // csrv will have LocalAddr "abc:1" // csrv, err := l.Accept(ctx) // csrv will have LocalAddr "abc:1"
// }() // }()
// ccli, err := h2.Dial(ctx, "abc:10") // ccli will be connection between "def:1" - "abc:1" // ccli, err := h2.Dial(ctx, "abc:10") // ccli will be connection between "def:1" - "abc:1"
// //
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors. // Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
...@@ -172,7 +172,7 @@ type conn struct { ...@@ -172,7 +172,7 @@ type conn struct {
closeOnce sync.Once closeOnce sync.Once
} }
// listener implements net.Listener for Host.Listen . // listener implements xnet.Listener for Host.Listen .
type listener struct { type listener struct {
// subnetwork/host/port we are listening on // subnetwork/host/port we are listening on
socket *socket socket *socket
...@@ -361,7 +361,7 @@ func (h *Host) Close() (err error) { ...@@ -361,7 +361,7 @@ func (h *Host) Close() (err error) {
// It either allocates free port if laddr is "" or with 0 port, or binds to laddr. // It either allocates free port if laddr is "" or with 0 port, or binds to laddr.
// Once listener is started, Dials could connect to listening address. // Once listener is started, Dials could connect to listening address.
// Connection requests created by Dials could be accepted via Accept. // Connection requests created by Dials could be accepted via Accept.
func (h *Host) Listen(laddr string) (_ net.Listener, err error) { func (h *Host) Listen(ctx context.Context, laddr string) (_ xnet.Listener, err error) {
var netladdr net.Addr var netladdr net.Addr
defer func() { defer func() {
if err != nil { if err != nil {
...@@ -373,6 +373,10 @@ func (h *Host) Listen(laddr string) (_ net.Listener, err error) { ...@@ -373,6 +373,10 @@ func (h *Host) Listen(laddr string) (_ net.Listener, err error) {
laddr = ":0" laddr = ":0"
} }
if err := ctx.Err(); err != nil {
return nil, err
}
a, err := h.parseAddr(laddr) a, err := h.parseAddr(laddr)
if err != nil { if err != nil {
return nil, err return nil, err
...@@ -454,7 +458,7 @@ func (l *listener) Close() error { ...@@ -454,7 +458,7 @@ func (l *listener) Close() error {
} }
// Accept tries to connect to Dial called with addr corresponding to our listener. // Accept tries to connect to Dial called with addr corresponding to our listener.
func (l *listener) Accept() (_ net.Conn, err error) { func (l *listener) Accept(ctx context.Context) (_ net.Conn, err error) {
h := l.socket.host h := l.socket.host
defer func() { defer func() {
...@@ -467,6 +471,8 @@ func (l *listener) Accept() (_ net.Conn, err error) { ...@@ -467,6 +471,8 @@ func (l *listener) Accept() (_ net.Conn, err error) {
var req dialReq var req dialReq
select { select {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.down: case <-l.down:
return nil, l.errDown() return nil, l.errDown()
...@@ -484,9 +490,18 @@ func (l *listener) Accept() (_ net.Conn, err error) { ...@@ -484,9 +490,18 @@ func (l *listener) Accept() (_ net.Conn, err error) {
req.resp <- &Accept{sk.addr(), ack} req.resp <- &Accept{sk.addr(), ack}
// wait for ack from acceptor. // wait for ack from acceptor.
var noack error
select { select {
case <-ctx.Done():
noack = ctx.Err()
case <-l.down: case <-l.down:
// acceptor was slow and we have to shutdown the listener. noack = l.errDown()
case err = <-ack:
// ok
}
if noack != nil {
// acceptor was slow.
// we have to make sure we still receive on ack and // we have to make sure we still receive on ack and
// close req.conn / unallocate the socket appropriately. // close req.conn / unallocate the socket appropriately.
go func() { go func() {
...@@ -500,10 +515,7 @@ func (l *listener) Accept() (_ net.Conn, err error) { ...@@ -500,10 +515,7 @@ func (l *listener) Accept() (_ net.Conn, err error) {
h.sockMu.Unlock() h.sockMu.Unlock()
}() }()
return nil, l.errDown() return nil, noack
case err = <-ack:
// ok
} }
// we got feedback from acceptor // we got feedback from acceptor
......
// Copyright (C) 2018 Nexedi SA and Contributors. // Copyright (C) 2018-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com> // Kirill Smelkov <kirr@nexedi.com>
// //
// This program is free software: you can Use, Study, Modify and Redistribute // This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your // it under the terms of the GNU General Public License version 3, or (at your
...@@ -31,6 +31,7 @@ import ( ...@@ -31,6 +31,7 @@ import (
"lab.nexedi.com/kirr/go123/exc" "lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/internal/xtesting" "lab.nexedi.com/kirr/go123/internal/xtesting"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/pipenet" "lab.nexedi.com/kirr/go123/xnet/pipenet"
. "lab.nexedi.com/kirr/go123/xnet/virtnet" . "lab.nexedi.com/kirr/go123/xnet/virtnet"
...@@ -47,7 +48,7 @@ type testNet struct { ...@@ -47,7 +48,7 @@ type testNet struct {
net *SubNetwork net *SubNetwork
, *Host , *Host
, net.Listener , xnet.Listener
cαβ, cβα net.Conn cαβ, cβα net.Conn
} }
...@@ -66,11 +67,11 @@ func newTestNet(t0 testing.TB) *testNet { ...@@ -66,11 +67,11 @@ func newTestNet(t0 testing.TB) *testNet {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t., err = t..Listen("") t., err = t..Listen(context.Background(), "")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t., err = t..Listen("") t., err = t..Listen(context.Background(), "")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
...@@ -85,7 +86,7 @@ func newTestNet(t0 testing.TB) *testNet { ...@@ -85,7 +86,7 @@ func newTestNet(t0 testing.TB) *testNet {
}() }()
wg.Go(func() error { wg.Go(func() error {
c, err := t..Accept() c, err := t..Accept(context.Background())
if err != nil { if err != nil {
return err return err
} }
...@@ -263,14 +264,14 @@ func TestClose(t *testing.T) { ...@@ -263,14 +264,14 @@ func TestClose(t *testing.T) {
// host.Listen vs subnet.Close // host.Listen vs subnet.Close
testClose(t, "subnet", func(t *testNet) { testClose(t, "subnet", func(t *testNet) {
l, err := t..Listen("") l, err := t..Listen(bg, "")
assert.Eq(l, nil) assert.Eq(l, nil)
assert.Eq(err, xneterr("listen", "α:0", ErrNetDown)) assert.Eq(err, xneterr("listen", "α:0", ErrNetDown))
}, serialOnly) }, serialOnly)
// host.Listen vs host.Close // host.Listen vs host.Close
testClose(t, "hα", func(t *testNet) { testClose(t, "hα", func(t *testNet) {
l, err := t..Listen("") l, err := t..Listen(bg, "")
assert.Eq(l, nil) assert.Eq(l, nil)
assert.Eq(err, xneterr("listen", "α:0", ErrHostDown)) assert.Eq(err, xneterr("listen", "α:0", ErrHostDown))
}, serialOnly) }, serialOnly)
...@@ -279,21 +280,21 @@ func TestClose(t *testing.T) { ...@@ -279,21 +280,21 @@ func TestClose(t *testing.T) {
// listener.Accept vs subnet.Close // listener.Accept vs subnet.Close
testClose(t, "subnet", func(t *testNet) { testClose(t, "subnet", func(t *testNet) {
c, err := t..Accept() c, err := t..Accept(bg)
assert.Eq(c, nil) assert.Eq(c, nil)
assert.Eq(err, xneterr("accept", "α:1", ErrNetDown)) assert.Eq(err, xneterr("accept", "α:1", ErrNetDown))
}) })
// listener.Accept vs host.Close // listener.Accept vs host.Close
testClose(t, "hα", func(t *testNet) { testClose(t, "hα", func(t *testNet) {
c, err := t..Accept() c, err := t..Accept(bg)
assert.Eq(c, nil) assert.Eq(c, nil)
assert.Eq(err, xneterr("accept", "α:1", ErrHostDown)) assert.Eq(err, xneterr("accept", "α:1", ErrHostDown))
}) })
// listener.Accept vs listener.Close // listener.Accept vs listener.Close
testClose(t, "lα", func(t *testNet) { testClose(t, "lα", func(t *testNet) {
c, err := t..Accept() c, err := t..Accept(bg)
assert.Eq(c, nil) assert.Eq(c, nil)
assert.Eq(err, xneterr("accept", "α:1", ErrSockDown)) assert.Eq(err, xneterr("accept", "α:1", ErrSockDown))
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment