Commit 40120cb0 authored by Kirill Smelkov's avatar Kirill Smelkov

xnet/pipenet: Generalize it into xnet/virtnet

As we are going to implement another virtual network it would be good to
share common code between implementations. For this generalize pipenet
implementation to also cover the case when one does not own full network
and owns only some hosts of it.

An example of such situation is when one process handles one group of
virtual hosts and another process handles another group of virtual
hosts. Below a group of virtual hosts handled as part of network is
called subnetwork.

If hosts are not created in one place, we need a way to communicate
information about new hosts in between subnetworks. This leads to using
some kind of "registry" (see Registry interface).

Then for the common code to be reused by a virtual network
implementation it has to provide its working in the form of Engine
interface to that common code. In turn the common code exposes another
- Notifier - interface for particular network implementation to notify
common code of events that come from outside to the subnetwork.

Pipenet is reworked to be just a client of the common virtnet
infrastructure.

Virtnet documentation follows:

"""
Package virtnet provides infrastructure for TCP-like virtual networks.

For testing distributed systems it is sometimes handy to imitate network of
several TCP hosts. It is also handy that ports allocated on Dial/Listen/Accept
on that hosts be predictable - that would help tests to verify network
events against expected sequence.

Package virtnet provides infrastructure for using and implementing such
TCP-like virtual networks.

Using virtnet networks

Addresses on a virtnet network are host:port pairs represented by Addr.
A network conceptually consists of several SubNetworks each being home for
multiple Hosts. Host is xnet.Networker and so can be worked with similarly
to regular TCP network access-point with Dial/Listen/Accept. Host's ports
allocation is predictable: ports of a host are contiguous integer sequence
starting from 1 that are all initially free, and whenever autobind is
requested the first free port of the host will be used.
Virtnet ensures that host names are unique throughout whole network.

To work with a virtnet network, one uses corresponding package for
particular virtnet network implementation. Such packages provide a way to
join particular network and after joining give back SubNetwork to user.
Starting from SubNetwork one can create Hosts and from those exchange data
throughout whole network.

Please see package lab.nexedi.com/kirr/go123/xnet/pipenet for particular
well-known virtnet-based network.

Implementing virtnet networks

To implement a virtnet-based network one need to implement Engine and Registry.

A virtnet network implementation should provide Engine and Registry
instances to SubNetwork when creating it. The subnetwork will use provided
engine and registry for its operations. A virtnet network implementation
receives instance of Notifier together with SubNetwork when creating it. The
network implementation should use provided Notifier to notify the subnetwork
to handle incoming events.

Please see Engine, Registry and Notifier documentation for details.
"""

Another virtnet-based network that is not limited to be used only in 1
OS process will follow next.
parent f04d243b
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package xtesting provides addons to std package testing.
//
// The tools provided are mostly useful when doing tests with exceptions.
package xtesting
import (
"fmt"
"reflect"
"testing"
"lab.nexedi.com/kirr/go123/exc"
)
// Asserter is handy objects to make asserts in tests.
//
// For example:
//
// assert := xtesting.Assert(t)
// assert.Eq(a, b)
// ..
//
// Contrary to t.Fatal* and e.g. github.com/stretchr/testify/require.Assert it
// is safe to use Asserter from non-main goroutine.
type Asserter struct {
t testing.TB
}
// Assert creates Asserter bound to t for reporting.
func Assert(t testing.TB) *Asserter {
return &Asserter{t}
}
// Eq asserts that a == b and raises exception if not.
func (x *Asserter) Eq(a, b interface{}) {
x.t.Helper()
if !reflect.DeepEqual(a, b) {
fmt.Printf("not equal:\nhave: %v\nwant: %v\n", a, b)
x.t.Errorf("not equal:\nhave: %v\nwant: %v", a, b)
exc.Raise(0)
}
}
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package virtnettest provides basic tests to be run on virtnet network implementations.
package virtnettest
import (
"context"
"io"
"net"
"testing"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/internal/xtesting"
"lab.nexedi.com/kirr/go123/xnet/virtnet"
)
type mklistener interface {
Listen(string) (net.Listener, error)
}
func xlisten(n mklistener, laddr string) net.Listener {
l, err := n.Listen(laddr)
exc.Raiseif(err)
return l
}
func xaccept(l net.Listener) net.Conn {
c, err := l.Accept()
exc.Raiseif(err)
return c
}
type dialer interface {
Dial(context.Context, string) (net.Conn, error)
}
func xdial(n dialer, addr string) net.Conn {
c, err := n.Dial(context.Background(), addr)
exc.Raiseif(err)
return c
}
func xread(r io.Reader) string {
buf := make([]byte, 4096)
n, err := r.Read(buf)
exc.Raiseif(err)
return string(buf[:n])
}
func xwrite(w io.Writer, data string) {
_, err := w.Write([]byte(data))
exc.Raiseif(err)
}
func xwait(w interface { Wait() error }) {
err := w.Wait()
exc.Raiseif(err)
}
// TestBasic runs basic tests on a virtnet network implementation.
func TestBasic(t *testing.T, subnet *virtnet.SubNetwork) {
X := exc.Raiseif
ctx := context.Background()
assert := xtesting.Assert(t)
defer func() {
err := subnet.Close()
X(err)
}()
xaddr := func(addr string) *virtnet.Addr {
a, err := virtnet.ParseAddr(subnet.Network(), addr)
X(err)
return a
}
, err := subnet.NewHost(ctx, "α")
X(err)
, err := subnet.NewHost(ctx, "β")
X(err)
assert.Eq(.Network(), subnet.Network())
assert.Eq(.Network(), subnet.Network())
assert.Eq(.Name(), "α")
assert.Eq(.Name(), "β")
_, err = .Dial(ctx, ":0")
assert.Eq(err, &net.OpError{Op: "dial", Net: subnet.Network(), Source: xaddr("α:1"), Addr: xaddr("α:0"), Err: virtnet.ErrConnRefused})
l1, err := .Listen("")
X(err)
assert.Eq(l1.Addr(), xaddr("α:1"))
// zero port always stays unused even after autobind
_, err = .Dial(ctx, ":0")
assert.Eq(err, &net.OpError{Op: "dial", Net: subnet.Network(), Source: xaddr("α:2"), Addr: xaddr("α:0"), Err: virtnet.ErrConnRefused})
wg := &errgroup.Group{}
wg.Go(exc.Funcx(func() {
c1s := xaccept(l1)
assert.Eq(c1s.LocalAddr(), xaddr("α:2"))
assert.Eq(c1s.RemoteAddr(), xaddr("β:1"))
assert.Eq(xread(c1s), "ping") // XXX for !pipe could read less
xwrite(c1s, "pong")
c2s := xaccept(l1)
assert.Eq(c2s.LocalAddr(), xaddr("α:3"))
assert.Eq(c2s.RemoteAddr(), xaddr("β:2"))
assert.Eq(xread(c2s), "hello")
xwrite(c2s, "world")
}))
c1c := xdial(, "α:1")
assert.Eq(c1c.LocalAddr(), xaddr("β:1"))
assert.Eq(c1c.RemoteAddr(), xaddr("α:2"))
xwrite(c1c, "ping")
assert.Eq(xread(c1c), "pong")
c2c := xdial(, "α:1")
assert.Eq(c2c.LocalAddr(), xaddr("β:2"))
assert.Eq(c2c.RemoteAddr(), xaddr("α:3"))
xwrite(c2c, "hello")
assert.Eq(xread(c2c), "world")
xwait(wg)
l2 := xlisten(, ":0") // autobind again
assert.Eq(l2.Addr(), xaddr("α:4"))
}
......@@ -38,101 +38,44 @@
// process without going to OS networking stack.
package pipenet
// TODO Fix pipenet for TCP semantic: there port(accepted) = port(listen), i.e.
// When we connect www.nexedi.com:80, remote addr of socket will have port 80.
// Likewise on server side accepted socket will have local port 80.
// The connection should be thus fully identified by src-dst address pair.
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"sync"
"lab.nexedi.com/kirr/go123/xnet"
)
const netPrefix = "pipe" // pipenet package creates only "pipe*" networks
"github.com/pkg/errors"
var (
errNetClosed = errors.New("network connection closed")
errAddrAlreadyUsed = errors.New("address already in use")
errAddrNoListen = errors.New("cannot listen on requested address")
errConnRefused = errors.New("connection refused")
"lab.nexedi.com/kirr/go123/xnet/virtnet"
)
// Addr represents address of a pipenet endpoint.
type Addr struct {
Net string // full network name, e.g. "pipe"
Host string // name of host access point on the network
Port int // port on host
}
const netPrefix = "pipe" // pipenet package creates only "pipe*" networks
// Network implements synchronous in-memory TCP-like network of pipes.
type Network struct {
// name of this network under "pipe" namespace -> e.g. ""
// full network name will be reported as "pipe"+name
name string
// big network lock for everything dynamic under Network
// (e.g. Host.socketv too)
mu sync.Mutex
hostMap map[string]*Host
vnet *virtnet.SubNetwork
vnotify virtnet.Notifier
}
// Host represents named access point on Network.
type Host struct {
// vengine implements virtnet.Engine for Network.
type vengine struct {
network *Network
name string
// NOTE protected by Network.mu
socketv []*socket // port -> listener | conn ; [0] is always nil
}
var _ xnet.Networker = (*Host)(nil)
// socket represents one endpoint entry on Host.
// ramRegistry implements virtnet.Registry in RAM.
//
// it can be either already connected or listening.
type socket struct {
host *Host // host/port this socket is bound to
port int
conn *conn // connection endpoint is here if != nil
listener *listener // listener is waiting here if != nil
}
// conn represents one endpoint of connection created under Network.
type conn struct {
socket *socket
peersk *socket // the other side of this connection
net.Conn
closeOnce sync.Once
}
// listener implements net.Listener for Host.Listen .
type listener struct {
// network/host/port we are listening on
socket *socket
dialq chan dialReq // Dial requests to our port go here
down chan struct{} // Close -> down=ready
closeOnce sync.Once
}
// Pipenet does not need a registry but virtnet is built for general case which
// needs one.
//
// Essentially it works as map protected by mutex.
type ramRegistry struct {
name string
// dialReq represents one dial request to listener.
type dialReq struct {
from *Host
resp chan net.Conn
mu sync.Mutex
hostTab map[string]string // hostname -> hostdata
closed bool // 1 after Close
}
// ----------------------------------------
// New creates new pipenet Network.
//
// Name is name of this network under "pipe" namespace, e.g. "α" will give full
......@@ -140,305 +83,169 @@ type dialReq struct {
//
// New does not check whether network name provided is unique.
func New(name string) *Network {
return &Network{name: name, hostMap: make(map[string]*Host)}
netname := netPrefix + name
n := &Network{}
v := &vengine{n}
r := newRAMRegistry(fmt.Sprintf("ram(%s)", netname))
subnet, vnotify := virtnet.NewSubNetwork(netname, v, r)
n.vnet = subnet
n.vnotify = vnotify
return n
}
// Host returns network access point by name.
// AsVirtNet exposes Network as virtnet subnetwork.
//
// If there was no such host before it creates new one.
func (n *Network) Host(name string) *Host {
n.mu.Lock()
defer n.mu.Unlock()
host := n.hostMap[name]
if host == nil {
host = &Host{network: n, name: name}
n.hostMap[name] = host
}
return host
// Since pipenet works entirely in RAM and in 1 OS process, its user interface
// is simpler compared to more general virtnet - for example there is no error
// when creating hosts. However sometimes it is handy to get access to pipenet
// network via full virtnet interface, when the code that is using pipenet
// network does not want to depend on pipenet API specifics.
func AsVirtNet(n *Network) *virtnet.SubNetwork {
return n.vnet
}
// resolveAddr resolves addr on the network from the host point of view.
//
// must be called with Network.mu held.
func (h *Host) resolveAddr(addr string) (host *Host, port int, err error) {
a, err := h.network.ParseAddr(addr)
if err != nil {
return nil, 0, err
}
// local host if host name omitted
if a.Host == "" {
a.Host = h.name
}
host = h.network.hostMap[a.Host]
if host == nil {
return nil, 0, &net.AddrError{Err: "no such host", Addr: addr}
}
return host, a.Port, nil
// Network returns name of the network.
func (n *Network) Network() string {
return n.vnet.Network()
}
// Listen starts new listener on the host.
// Host returns network access point by name.
//
// It either allocates free port if laddr is "" or with 0 port, or binds to laddr.
// Once listener is started, Dials could connect to listening address.
// Connection requests created by Dials could be accepted via Accept.
func (h *Host) Listen(laddr string) (net.Listener, error) {
h.network.mu.Lock()
defer h.network.mu.Unlock()
var sk *socket
if laddr == "" {
laddr = ":0"
}
var netladdr net.Addr
lerr := func(err error) error {
return &net.OpError{Op: "listen", Net: h.Network(), Addr: netladdr, Err: err}
// If there was no such host before it creates new one.
//
// Host panics if underlying virtnet subnetwork was shut down.
func (n *Network) Host(name string) *virtnet.Host {
// check if it is already there
host := n.vnet.Host(name)
if host != nil {
return host
}
host, port, err := h.resolveAddr(laddr)
if err != nil {
return nil, lerr(err)
// if not - create it. Creation will not block.
host, err := n.vnet.NewHost(context.Background(), name)
if host != nil {
return host
}
netladdr = &Addr{Net: h.Network(), Host: host.name, Port: port}
// the only way we could get error here is due to either someone else
// making the host in parallel to us, or virtnet shutdown.
switch errors.Cause(err) {
case virtnet.ErrHostDup:
// ok
case virtnet.ErrNetDown:
panic(err)
if host != h {
return nil, lerr(errAddrNoListen)
default:
panic(fmt.Sprintf("pipenet: NewHost failed not due to dup or shutdown: %s", err))
}
// find first free port if autobind requested
if port == 0 {
sk = h.allocFreeSocket()
// else allocate socket in-place
} else {
// grow if needed
for port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
}
if h.socketv[port] != nil {
return nil, lerr(errAddrAlreadyUsed)
}
sk = &socket{host: h, port: port}
h.socketv[port] = sk
// if it was dup - we should be able to get it.
//
// even if dup.Close is called in the meantime it will mark the host as
// down, but won't remove it from vnet .hostMap.
host = n.vnet.Host(name)
if host == nil {
panic(fmt.Sprintf("pipenet: NewHost said host already is there, but it was not found"))
}
// create listener under socket
l := &listener{
socket: sk,
dialq: make(chan dialReq),
down: make(chan struct{}),
}
sk.listener = l
return host
}
return l, nil
// VNetNewHost implements virtnet.Engine .
func (v *vengine) VNetNewHost(ctx context.Context, hostname string, registry virtnet.Registry) error {
// for pipenet there is neither need to create host resources, nor need
// to keep any hostdata.
return registry.Announce(ctx, hostname, "")
}
// Close closes the listener.
// VNetDial implements virtnet dialing for pipenet.
//
// It interrupts all currently in-flight calls to Accept.
func (l *listener) Close() error {
l.closeOnce.Do(func() {
close(l.down)
sk := l.socket
h := sk.host
n := h.network
n.mu.Lock()
defer n.mu.Unlock()
sk.listener = nil
if sk.empty() {
h.socketv[sk.port] = nil
}
})
return nil
}
// Simply create pipe pair and send one end directly to virtnet acceptor.
func (v *vengine) VNetDial(ctx context.Context, src, dst *virtnet.Addr, _ string) (_ net.Conn, addrAccept *virtnet.Addr, _ error) {
pc, ps := net.Pipe()
accept, err := v.network.vnotify.VNetAccept(ctx, src, dst, ps)
if err != nil {
pc.Close()
ps.Close()
return nil, nil, err
}
// Accept tries to connect to Dial called with addr corresponding to our listener.
func (l *listener) Accept() (net.Conn, error) {
h := l.socket.host
n := h.network
accept.Ack <- nil
return pc, accept.Addr, nil
}
select {
case <-l.down:
return nil, &net.OpError{Op: "accept", Net: h.Network(), Addr: l.Addr(), Err: errNetClosed}
// Close implements virtnet.Engine .
func (v *vengine) Close() error {
return nil // nop: there is no underlying resources to release.
}
case req := <-l.dialq:
// someone dialed us - let's connect
pc, ps := net.Pipe()
// allocate sockets and register conns to Network under them
n.mu.Lock()
skc := req.from.allocFreeSocket()
sks := h.allocFreeSocket()
skc.conn = &conn{socket: skc, peersk: sks, Conn: pc}
sks.conn = &conn{socket: sks, peersk: skc, Conn: ps}
// Announce implements virtnet.Registry .
func (r *ramRegistry) Announce(ctx context.Context, hostname, hostdata string) (err error) {
defer r.regerr(&err, "announce", hostname, hostdata)
n.mu.Unlock()
r.mu.Lock()
defer r.mu.Unlock()
req.resp <- skc.conn
return sks.conn, nil
if r.closed {
return virtnet.ErrRegistryDown
}
}
// Dial dials address on the network.
//
// It tries to connect to Accept called on listener corresponding to addr.
func (h *Host) Dial(ctx context.Context, addr string) (net.Conn, error) {
var netaddr net.Addr
derr := func(err error) error {
return &net.OpError{Op: "dial", Net: h.Network(), Addr: netaddr, Err: err}
if _, already := r.hostTab[hostname]; already {
return virtnet.ErrHostDup
}
n := h.network
n.mu.Lock()
r.hostTab[hostname] = hostdata
return nil
}
host, port, err := h.resolveAddr(addr)
if err != nil {
n.mu.Unlock()
return nil, derr(err)
}
// Query implements virtnet.Registry .
func (r *ramRegistry) Query(ctx context.Context, hostname string) (hostdata string, err error) {
defer r.regerr(&err, "query", hostname)
netaddr = &Addr{Net: h.Network(), Host: host.name, Port: port}
r.mu.Lock()
defer r.mu.Unlock()
if port >= len(host.socketv) {
n.mu.Unlock()
return nil, derr(errConnRefused)
if r.closed {
return "", virtnet.ErrRegistryDown
}
sks := host.socketv[port]
if sks == nil || sks.listener == nil {
n.mu.Unlock()
return nil, derr(errConnRefused)
hostdata, ok := r.hostTab[hostname]
if !ok {
return "", virtnet.ErrNoHost
}
l := sks.listener
// NOTE Accept is locking n.mu -> we must release n.mu before sending dial request
n.mu.Unlock()
resp := make(chan net.Conn)
select {
case <-ctx.Done():
return nil, derr(ctx.Err())
case <-l.down:
return nil, derr(errConnRefused)
case l.dialq <- dialReq{from: h, resp: resp}:
return <-resp, nil
}
return hostdata, nil
}
// Close closes network endpoint and unregisters conn from Host.
//
// All currently in-flight blocked IO is interrupted with an error.
func (c *conn) Close() (err error) {
c.closeOnce.Do(func() {
err = c.Conn.Close()
sk := c.socket
h := sk.host
n := h.network
n.mu.Lock()
defer n.mu.Unlock()
sk.conn = nil
if sk.empty() {
h.socketv[sk.port] = nil
}
})
// Close implements virtnet.Registry .
func (r *ramRegistry) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
return err
r.closed = true
return nil
}
// LocalAddr implements net.Conn.
//
// it returns pipenet address of local end of connection.
func (c *conn) LocalAddr() net.Addr {
return c.socket.addr()
func newRAMRegistry(name string) *ramRegistry {
return &ramRegistry{name: name, hostTab: make(map[string]string)}
}
// RemoteAddr implements net.Conn.
// regerr is syntactic sugar to wrap !nil *errp into RegistryError.
//
// it returns pipenet address of remote end of connection.
func (c *conn) RemoteAddr() net.Addr {
return c.peersk.addr()
}
// ----------------------------------------
// allocFreeSocket finds first free port and allocates socket entry for it.
// intended too be used like
//
// must be called with Network.mu held.
func (h *Host) allocFreeSocket() *socket {
// find first free port
port := 1 // never allocate port 0 - it is used for autobind on listen only
for ; port < len(h.socketv); port++ {
if h.socketv[port] == nil {
break
}
}
// if all busy it exits with port >= len(h.socketv)
// grow if needed
for port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
// defer r.regerr(&err, "operation", arg1, arg2, ...)
func (r *ramRegistry) regerr(errp *error, op string, args ...interface{}) {
if *errp == nil {
return
}
sk := &socket{host: h, port: port}
h.socketv[port] = sk
return sk
}
// empty checks whether socket's both conn and listener are all nil.
func (sk *socket) empty() bool {
return sk.conn == nil && sk.listener == nil
}
// addr returns address corresponding to socket.
func (sk *socket) addr() *Addr {
h := sk.host
return &Addr{Net: h.Network(), Host: h.name, Port: sk.port}
}
func (a *Addr) Network() string { return a.Net }
func (a *Addr) String() string { return net.JoinHostPort(a.Host, strconv.Itoa(a.Port)) }
// ParseAddr parses addr into pipenet address.
func (n *Network) ParseAddr(addr string) (*Addr, error) {
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
*errp = &virtnet.RegistryError{
Registry: r.name,
Op: op,
Args: args,
Err: *errp,
}
port, err := strconv.Atoi(portstr)
if err != nil || port < 0 {
return nil, &net.AddrError{Err: "invalid port", Addr: addr}
}
return &Addr{Net: n.Network(), Host: host, Port: port}, nil
}
// Addr returns address where listener is accepting incoming connections.
func (l *listener) Addr() net.Addr {
return l.socket.addr()
}
// Network returns full network name of this network.
func (n *Network) Network() string { return NetPrefix + n.name }
// Network returns full network name of underlying network.
func (h *Host) Network() string { return h.network.Network() }
// Name returns host name.
func (h *Host) Name() string { return h.name }
......@@ -20,133 +20,38 @@
package pipenet
import (
"context"
"fmt"
"io"
"net"
"reflect"
"testing"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/internal/xtesting"
"lab.nexedi.com/kirr/go123/xnet/internal/virtnettest"
)
// we assume net.Pipe works ok; here we only test Listen/Accept/Dial routing
// XXX tests are ugly, non-robust and small coverage
type mklistener interface {
Listen(string) (net.Listener, error)
}
func xlisten(n mklistener, laddr string) net.Listener {
l, err := n.Listen(laddr)
exc.Raiseif(err)
return l
}
func xaccept(l net.Listener) net.Conn {
c, err := l.Accept()
exc.Raiseif(err)
return c
}
type dialer interface {
Dial(context.Context, string) (net.Conn, error)
}
func xdial(n dialer, addr string) net.Conn {
c, err := n.Dial(context.Background(), addr)
exc.Raiseif(err)
return c
}
func xread(r io.Reader) string {
buf := make([]byte, 4096)
n, err := r.Read(buf)
exc.Raiseif(err)
return string(buf[:n])
}
func xwrite(w io.Writer, data string) {
_, err := w.Write([]byte(data))
exc.Raiseif(err)
}
func xwait(w interface { Wait() error }) {
err := w.Wait()
exc.Raiseif(err)
}
func assertEq(t *testing.T, a, b interface{}) {
t.Helper()
if !reflect.DeepEqual(a, b) {
fmt.Printf("not equal:\nhave: %v\nwant: %v\n", a, b)
t.Errorf("not equal:\nhave: %v\nwant: %v", a, b)
exc.Raise(0)
}
func TestPipeNet(t *testing.T) {
virtnettest.TestBasic(t, New("t").vnet)
}
func TestPipeNet(t *testing.T) {
// pipenet has a bit different API than virtnet: Host has no error and returns
// same instance if called twice, not dup, etc. Test it.
func TestPipeNet2(t *testing.T) {
assert := xtesting.Assert(t)
pnet := New("t")
xaddr := func(addr string) *Addr {
a, err := pnet.ParseAddr(addr)
exc.Raiseif(err)
return a
}
:= pnet.Host("α")
:= pnet.Host("β")
assertEq(t, .Network(), "pipet")
assertEq(t, .Network(), "pipet")
assertEq(t, .Name(), "α")
assertEq(t, .Name(), "β")
_, err := .Dial(context.Background(), ":0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipet", Addr: xaddr("α:0"), Err: errConnRefused})
l1 := xlisten(, "")
assertEq(t, l1.Addr(), xaddr("α:1"))
// zero port always stays unused even after autobind
_, err = .Dial(context.Background(), ":0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipet", Addr: xaddr("α:0"), Err: errConnRefused})
wg := &errgroup.Group{}
wg.Go(exc.Funcx(func() {
c1s := xaccept(l1)
assertEq(t, c1s.LocalAddr(), xaddr("α:2"))
assertEq(t, c1s.RemoteAddr(), xaddr("β:1"))
assert.Eq(.Network(), "pipet")
assert.Eq(.Network(), "pipet")
assert.Eq(.Name(), "α")
assert.Eq(.Name(), "β")
assertEq(t, xread(c1s), "ping")
xwrite(c1s, "pong")
hα2 := pnet.Host(")
hβ2 := pnet.Host(")
c2s := xaccept(l1)
assertEq(t, c2s.LocalAddr(), xaddr("α:3"))
assertEq(t, c2s.RemoteAddr(), xaddr("β:2"))
assertEq(t, xread(c2s), "hello")
xwrite(c2s, "world")
}))
c1c := xdial(, "α:1")
assertEq(t, c1c.LocalAddr(), xaddr("β:1"))
assertEq(t, c1c.RemoteAddr(), xaddr("α:2"))
xwrite(c1c, "ping")
assertEq(t, xread(c1c), "pong")
c2c := xdial(, "α:1")
assertEq(t, c2c.LocalAddr(), xaddr("β:2"))
assertEq(t, c2c.RemoteAddr(), xaddr("α:3"))
xwrite(c2c, "hello")
assertEq(t, xread(c2c), "world")
xwait(wg)
if !( == hα2 && == hβ2) {
t.Fatalf("Host(x) is not idempotent")
}
l2 := xlisten(, ":0") // autobind again
assertEq(t, l2.Addr(), xaddr("α:4"))
if AsVirtNet(pnet) != pnet.vnet {
t.Fatal("AsVirtNet broken")
}
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package virtnet
func SubnetShutdown(n *SubNetwork, err error) {
n.shutdown(err)
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package virtnet
// interfaces that virtnet uses in its working.
import (
"context"
"errors"
"fmt"
"net"
)
// Engine is the interface for particular virtnet network implementation to be
// used by SubNetwork.
//
// A virtnet network implementation should provide Engine instance to
// SubNetwork when creating it. The subnetwork will use provided engine for its
// operations.
//
// It should be safe to access Engine from multiple goroutines simultaneously.
type Engine interface {
// VNetNewHost creates resources for host and announces it to registry.
//
// VNetNewHost should create resources for new host and announce
// hostname to provided registry. When announcing it should encode in
// hostdata a way for VNetDial - potentially run on another subnetwork
// - to find out where to connect to when dialing to this host.
//
// On error the returned error will be wrapped by virtnet with "new
// host" operation and hostname.
VNetNewHost(ctx context.Context, hostname string, registry Registry) error
// VNetDial creates outbound virtnet connection.
//
// VNetDial, given destination virtnet address and destination
// hostdata, should establish connection to destination. It should let
// remote side know that its peer virtnet address is src.
//
// On success net.Conn that will be handling data exchange via its
// Read/Write should be returned. This net.Conn will be wrapped by
// virtnet with overwritten LocalAddr and RemoteAddr to be src and
// addrAccept correspondingly.
//
// On error the returned error will be wrapped by virtnet with
// corresponding net.OpError{"dial", src, dst}.
//
// Virtnet always passes to VNetDial src and dst with the same network
// name that was used when creating corresponding SubNetwork.
VNetDial(ctx context.Context, src, dst *Addr, dsthostdata string) (_ net.Conn, addrAccept *Addr, _ error)
// Close shuts down subnetwork engine.
//
// Close should close engine resources and return corresponding error.
//
// There is no need to explicitly interrupt other engine operations -
// to those virtnet always passes ctx that is canceled before
// engine.Close is called.
Close() error
}
// Notifier is the interface to be used by particular virtnet network
// implementation for notifying SubNetwork.
//
// A virtnet network implementation receives instance of Notifier together with
// SubNetwork when creating it. The network implementation should use provided
// Notifier to notify the subnetwork to handle incoming events.
//
// It should be safe to access Notifier from multiple goroutines simultaneously.
type Notifier interface {
// VNetAccept notifies virtnet about incoming connection.
//
// VNetAccept, given destination virtnet address, should make decision
// to either accept or reject provided connection.
//
// On success the connection is pre-accepted and corresponding Accept
// is returned to virtnet network implementation.
//
// On error an error is returned without any "accept" prefix, e.g.
// ErrConnRefused. Such accept prefix should be provided by network
// implementation that is using VNetAccept.
VNetAccept(ctx context.Context, src, dst *Addr, netconn net.Conn) (*Accept, error)
// VNetDown notifies virtnet that underlying network is down.
//
// Provided err describes the cause of why the network is down.
VNetDown(err error)
}
// Accept represents successful acceptance decision from Notifier.VNetAccept .
//
// On successful accept decision corresponding virtnet-level Accept() is
// waiting on .Ack to continue and will check the error from there.
//
// On success the connection will be finally accepted and corresponding virtnet
// listener will be notified. Provided netconn will be adjusted by virtnet
// internally with overwritten LocalAddr and RemoteAddr to be correspondingly
// .Addr and src that was originally passed to VNetAccept.
//
// On error the acceptance will be canceled.
type Accept struct {
Addr *Addr // accepting with this local address
Ack chan error
}
// Registry represents access to a virtnet network registry.
//
// A virtnet network implementation should provide Registry instance to
// SubNetwork when creating it. The subnetwork will eventually use it when
// creating hosts via NewHost, and in turn creating outbound connections on
// them via Host.Dial.
//
// The registry holds information about hosts available on the network, and
// for each host additional data about it. Whenever host α needs to establish
// connection to address on host β, it queries the registry for β.
// Correspondingly when a host joins the network, it announces itself to the
// registry so that other hosts could see it.
//
// The registry could be implemented in several ways, for example:
//
// - dedicated network server,
// - hosts broadcasting information to each other similar to ARP,
// - shared memory or file,
// - ...
//
// It should be safe to access registry from multiple goroutines simultaneously.
type Registry interface {
// Announce announces host to registry.
//
// Returned error, if !nil, is *RegistryError with .Err describing the
// error cause:
//
// - ErrRegistryDown if registry cannot be accessed,
// - ErrHostDup if hostname was already announced,
// - some other error indicating e.g. IO problem.
Announce(ctx context.Context, hostname, hostdata string) error
// Query queries registry for host.
//
// Returned error, if !nil, is *RegistryError with .Err describing the
// error cause:
//
// - ErrRegistryDown if registry cannot be accessed,
// - ErrNoHost if hostname was not announced to registry,
// - some other error indicating e.g. IO problem.
Query(ctx context.Context, hostname string) (hostdata string, _ error)
// Close closes access to registry.
//
// Close should close registry resources and return corresponding error.
//
// There is no need to explicitly interrupt other registry operations -
// to those virtnet always passes ctx that is canceled before
// registry.Close is called.
Close() error
}
var (
ErrRegistryDown = errors.New("registry is down")
ErrNoHost = errors.New("no such host")
ErrHostDup = errors.New("host already registered")
)
// RegistryError represents an error of a registry operation.
type RegistryError struct {
Registry string // name of the registry
Op string // operation that failed
Args interface{} // operation arguments, if any
Err error // actual error that occurred during the operation
}
func (e *RegistryError) Error() string {
s := e.Registry + ": " + e.Op
if e.Args != nil {
s += fmt.Sprintf(" %q", e.Args)
}
s += ": " + e.Err.Error()
return s
}
func (e *RegistryError) Cause() error {
return e.Err
}
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// Package virtnet provides infrastructure for TCP-like virtual networks.
//
// For testing distributed systems it is sometimes handy to imitate network of
// several TCP hosts. It is also handy that ports allocated on Dial/Listen/Accept
// on that hosts be predictable - that would help tests to verify network
// events against expected sequence.
//
// Package virtnet provides infrastructure for using and implementing such
// TCP-like virtual networks.
//
//
// Using virtnet networks
//
// Addresses on a virtnet network are host:port pairs represented by Addr.
// A network conceptually consists of several SubNetworks each being home for
// multiple Hosts. Host is xnet.Networker and so can be worked with similarly
// to regular TCP network access-point with Dial/Listen/Accept. Host's ports
// allocation is predictable: ports of a host are contiguous integer sequence
// starting from 1 that are all initially free, and whenever autobind is
// requested the first free port of the host will be used.
// Virtnet ensures that host names are unique throughout whole network.
//
// To work with a virtnet network, one uses corresponding package for
// particular virtnet network implementation. Such packages provide a way to
// join particular network and after joining give back SubNetwork to user.
// Starting from SubNetwork one can create Hosts and from those exchange data
// throughout whole network.
//
// Please see package lab.nexedi.com/kirr/go123/xnet/pipenet for particular
// well-known virtnet-based network.
//
//
// Implementing virtnet networks
//
// To implement a virtnet-based network one need to implement Engine and Registry.
//
// A virtnet network implementation should provide Engine and Registry
// instances to SubNetwork when creating it. The subnetwork will use provided
// engine and registry for its operations. A virtnet network implementation
// receives instance of Notifier together with SubNetwork when creating it. The
// network implementation should use provided Notifier to notify the subnetwork
// to handle incoming events.
//
// Please see Engine, Registry and Notifier documentation for details.
package virtnet
// TODO Fix virtnet for TCP semantic: there port(accepted) = port(listen), i.e.
// When we connect www.nexedi.com:80, remote addr of socket will have port 80.
// Likewise on server side accepted socket will have local port 80.
// The connection should be thus fully identified by src-dst address pair.
import (
"context"
"errors"
"io"
"net"
"strconv"
"sync"
"sync/atomic"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
)
var (
ErrNetDown = errors.New("network is down")
ErrHostDown = errors.New("host is down")
ErrSockDown = errors.New("socket is down")
ErrAddrAlreadyUsed = errors.New("address already in use")
ErrAddrNoListen = errors.New("cannot listen on requested address")
ErrConnRefused = errors.New("connection refused")
)
// Addr represents address of a virtnet endpoint.
type Addr struct {
Net string // full network name, e.g. "pipeα" or "lonetβ"
Host string // name of host access point on the network
Port int // port on host
}
// SubNetwork represents one subnetwork of a virtnet network.
//
// Multiple Hosts could be created on one subnetwork.
// Multiple subnetworks could be part of a single virtnet network.
//
// Host names are unique through whole virtnet network.
//
// SubNetwork should be created by a particular virtnet network implementation
// via NewSubNetwork.
type SubNetwork struct {
// full network name, e.g. "pipeα" or "lonetβ"
network string
// virtnet network implementation and registry given to us
engine Engine
registry Registry
// {} hostname -> Host
hostMu sync.Mutex
hostMap map[string]*Host
down chan struct{} // closed when no longer operational
downErr error
downOnce sync.Once
}
// Host represents named access point on a virtnet network.
//
// A Host belongs to a SubNetwork.
// It has name and array of sockets indexed by port.
// It implements xnet.Networker.
//
// It is safe to use Host from multiple goroutines simultaneously.
type Host struct {
subnet *SubNetwork
name string
// [] port -> listener | conn ; [0] is always nil
sockMu sync.Mutex
socketv []*socket
down chan struct{} // closed when no longer operational
downOnce sync.Once
}
var _ xnet.Networker = (*Host)(nil)
// socket represents one endpoint entry on Host.
//
// it can be either already connected or listening.
type socket struct {
host *Host // host/port this socket is bound to
port int
conn *conn // connection endpoint is here if != nil
listener *listener // listener is waiting here if != nil
}
// conn represents one endpoint of a virtnet connection.
//
// conn is the net.Conn implementation that Host.Dial and listener.Accept return.
type conn struct {
socket *socket // local socket
peerAddr *Addr // address of the remote side of this connection
net.Conn
down uint32 // 1 after shutdown
downOnce sync.Once
errClose error // error we got from closing underlying net.Conn
closeOnce sync.Once
}
// listener implements net.Listener for Host.Listen .
type listener struct {
// subnetwork/host/port we are listening on
socket *socket
dialq chan dialReq // Dial requests to our port go here
down chan struct{} // closed when no longer operational
downOnce sync.Once
closeOnce sync.Once
}
// dialReq represents one dial request to listener from acceptor.
type dialReq struct {
from *Addr
conn net.Conn
resp chan *Accept
}
// notifier implements Notifier for SubNetwork.
//
// it is separate from SubNetwork not to generally expose Notifier as API
// virtnet users (contrary to virtnet network implementers) should use.
type notifier struct {
subnet *SubNetwork
}
// ----------------------------------------
// NewSubNetwork creates new SubNetwork with given name.
//
// It should be used by virtnet network implementations who should provide it
// with Engine and Registry instances.
//
// Together with returned SubNetwork an instance of Notifier is provided that
// should be used by virtnet network implementation to notify created
// subnetwork to handle incoming events.
func NewSubNetwork(network string, engine Engine, registry Registry) (*SubNetwork, Notifier) {
// XXX prefix network with "virtnet/" ?
subnet := &SubNetwork{
network: network,
engine: engine,
registry: registry,
hostMap: make(map[string]*Host),
down: make(chan struct{}),
}
return subnet, &notifier{subnet}
}
// shutdown shutdowns subnetwork.
//
// It is worker for Close and VNetDown.
//
// The error provided is the cause of shutdown - e.g. IO error from engine, or
// nil on plain close.
//
// It is safe to call shutdown multiple times and from multiple goroutines
// simultaneously - only the first call has the effect.
//
// The error returned is cumulative shutdown error - the cause + any error from
// closing engine and registry for the call when shutdown was actually performed.
func (n *SubNetwork) shutdown(err error) error {
n.downOnce.Do(func() {
close(n.down)
// shutdown hosts
n.hostMu.Lock()
for _, host := range n.hostMap {
host.shutdown()
}
n.hostMu.Unlock()
var errv xerr.Errorv
errv.Appendif( err )
errv.Appendif( n.engine.Close() )
errv.Appendif( n.registry.Close() )
n.downErr = errv.Err()
})
return n.downErr
}
// Close shutdowns subnetwork.
//
// It recursively interrupts all blocking operations on the subnetwork and
// shutdowns all subnetwork's hosts and connections.
func (n *SubNetwork) Close() (err error) {
defer xerr.Contextf(&err, "virtnet %q: close", n.network)
return n.shutdown(nil)
}
// VNetDown implements Notifier by shutting subnetwork down upon engine error.
func (nn *notifier) VNetDown(err error) {
nn.subnet.shutdown(err)
}
// NewHost creates new Host with given name.
//
// The host will be associated with SubNetwork via which it was created.
//
// Host names should be unique through whole virtnet network.
// If not - an error with ErrHostDup cause will be returned.
func (n *SubNetwork) NewHost(ctx context.Context, name string) (_ *Host, err error) {
defer xerr.Contextf(&err, "virtnet %q: new host %q", n.network, name)
// cancel on network shutdown
origCtx := ctx
ctx, cancel := xcontext.MergeChan(ctx, n.down); defer cancel()
// announce new host
err = n.engine.VNetNewHost(ctx, name, n.registry)
if err != nil {
if ctx.Err() != nil && origCtx.Err() == nil {
// error due to subnetwork shutdown
err = ErrNetDown
}
return nil, err
}
// announced ok -> host can be created
n.hostMu.Lock()
defer n.hostMu.Unlock()
if n.hostMap[name] != nil {
panic("announced ok but .hostMap already !empty")
}
host := &Host{subnet: n, name: name, down: make(chan struct{})}
n.hostMap[name] = host
return host, nil
}
// Host returns host on the subnetwork by name.
//
// If there is no such host - nil is returned.
func (n *SubNetwork) Host(name string) *Host {
n.hostMu.Lock()
defer n.hostMu.Unlock()
return n.hostMap[name]
}
// shutdown is underlying worker for Close.
func (h *Host) shutdown() {
h.downOnce.Do(func() {
close(h.down)
// shutdown all sockets
h.sockMu.Lock()
defer h.sockMu.Unlock()
for _, sk := range h.socketv {
if sk == nil {
continue
}
if sk.conn != nil {
sk.conn.shutdown()
}
if sk.listener != nil {
sk.listener.shutdown()
}
}
})
}
// Close shutdowns host.
//
// After host is shutdown connections to it cannot be established and all
// currently-established connections are shut down.
//
// Close interrupts all currently in-flight blocked I/O operations on Host or
// objects created from it: connections and listeners.
func (h *Host) Close() (err error) {
defer xerr.Contextf(&err, "virtnet %q: host %q: close", h.subnet.network, h.name)
h.shutdown()
return nil
}
// Listen starts new listener on the host.
//
// It either allocates free port if laddr is "" or with 0 port, or binds to laddr.
// Once listener is started, Dials could connect to listening address.
// Connection requests created by Dials could be accepted via Accept.
func (h *Host) Listen(laddr string) (_ net.Listener, err error) {
var netladdr net.Addr
defer func() {
if err != nil {
err = &net.OpError{Op: "listen", Net: h.Network(), Addr: netladdr, Err: err}
}
}()
if laddr == "" {
laddr = ":0"
}
a, err := h.parseAddr(laddr)
if err != nil {
return nil, err
}
netladdr = a
// cannot listen on other hosts
if a.Host != h.name {
return nil, ErrAddrNoListen
}
if ready(h.down) {
return nil, h.errDown()
}
h.sockMu.Lock()
defer h.sockMu.Unlock()
var sk *socket
// find first free port if autobind requested
if a.Port == 0 {
sk = h.allocFreeSocket()
// else allocate socket in-place
} else {
// grow if needed
for a.Port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
}
if h.socketv[a.Port] != nil {
return nil, ErrAddrAlreadyUsed
}
sk = &socket{host: h, port: a.Port}
h.socketv[a.Port] = sk
}
// create listener under socket
l := &listener{
socket: sk,
dialq: make(chan dialReq),
down: make(chan struct{}),
}
sk.listener = l
return l, nil
}
// shutdown shutdowns the listener.
//
// It interrupts all currently in-flight calls to Accept, but does not
// unregister listener from host's socket map.
func (l *listener) shutdown() {
l.downOnce.Do(func() {
close(l.down)
})
}
// Close closes the listener.
//
// It interrupts all currently in-flight calls to Accept.
func (l *listener) Close() error {
l.shutdown()
l.closeOnce.Do(func() {
sk := l.socket
h := sk.host
h.sockMu.Lock()
defer h.sockMu.Unlock()
sk.listener = nil
if sk.empty() {
h.socketv[sk.port] = nil
}
})
return nil
}
// Accept tries to connect to Dial called with addr corresponding to our listener.
func (l *listener) Accept() (_ net.Conn, err error) {
h := l.socket.host
defer func() {
if err != nil {
err = &net.OpError{Op: "accept", Net: h.Network(), Addr: l.Addr(), Err: err}
}
}()
for {
var req dialReq
select {
case <-l.down:
return nil, l.errDown()
case req = <-l.dialq:
// ok
}
// acceptor dials us - allocate empty socket so that we know accept address.
h.sockMu.Lock()
sk := h.allocFreeSocket()
h.sockMu.Unlock()
// give acceptor feedback that we are accepting the connection.
ack := make(chan error)
req.resp <- &Accept{sk.addr(), ack}
// wait for ack from acceptor.
select {
case <-l.down:
// acceptor was slow and we have to shutdown the listener.
// we have to make sure we still receive on ack and
// close req.conn / unallocate the socket appropriately.
go func() {
err := <-ack
if err == nil {
// acceptor conveyed us the connection - close it
req.conn.Close()
}
h.sockMu.Lock()
h.socketv[sk.port] = nil
h.sockMu.Unlock()
}()
return nil, l.errDown()
case err = <-ack:
// ok
}
// we got feedback from acceptor
// if there is an error - unallocate the socket and continue waiting.
if err != nil {
h.sockMu.Lock()
h.socketv[sk.port] = nil
h.sockMu.Unlock()
continue
}
// all ok - allocate conn, bind it to socket and we are done.
c := &conn{socket: sk, peerAddr: req.from, Conn: req.conn}
h.sockMu.Lock()
sk.conn = c
h.sockMu.Unlock()
return c, nil
}
}
// VNetAccept implements Notifier by accepting or rejecting incoming connection.
func (nn *notifier) VNetAccept(ctx context.Context, src, dst *Addr, netconn net.Conn) (*Accept, error) {
n := nn.subnet
n.hostMu.Lock()
host := n.hostMap[dst.Host]
n.hostMu.Unlock()
if host == nil {
return nil, &net.AddrError{Err: "no such host", Addr: dst.String()}
}
host.sockMu.Lock()
if dst.Port >= len(host.socketv) {
host.sockMu.Unlock()
return nil, ErrConnRefused
}
sk := host.socketv[dst.Port]
if sk == nil || sk.listener == nil {
host.sockMu.Unlock()
return nil, ErrConnRefused
}
// there is listener corresponding to dst - let's connect it
l := sk.listener
host.sockMu.Unlock()
resp := make(chan *Accept)
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-l.down:
return nil, ErrConnRefused
case l.dialq <- dialReq{from: src, conn: netconn, resp: resp}:
return <-resp, nil
}
}
// Dial dials address on the network.
//
// It tries to connect to Accept called on listener corresponding to addr.
func (h *Host) Dial(ctx context.Context, addr string) (_ net.Conn, err error) {
// allocate socket in empty state early, so we can see in the error who
// tries to dial.
h.sockMu.Lock()
sk := h.allocFreeSocket()
h.sockMu.Unlock()
defer func() {
if err != nil {
h.sockMu.Lock()
h.socketv[sk.port] = nil
h.sockMu.Unlock()
}
}()
var netdst net.Addr
defer func() {
if err != nil {
err = &net.OpError{Op: "dial", Net: h.Network(), Source: sk.addr(), Addr: netdst, Err: err}
}
}()
dst, err := h.parseAddr(addr)
if err != nil {
return nil, err
}
netdst = dst
n := h.subnet
// cancel on host shutdown
origCtx := ctx
ctx, cancel := xcontext.MergeChan(ctx, h.down); defer cancel()
errOrDown := func(err error) error {
if ctx.Err() != nil && origCtx.Err() == nil {
// error due to shutdown
return h.errDown()
}
return err
}
// query registry
dstdata, err := n.registry.Query(ctx, dst.Host)
if err != nil {
return nil, errOrDown(err)
}
// dial engine
netconn, acceptAddr, err := n.engine.VNetDial(ctx, sk.addr(), dst, dstdata)
if err != nil {
return nil, errOrDown(err)
}
// handshake performed ok - we are done.
c := &conn{socket: sk, peerAddr: acceptAddr, Conn: netconn}
h.sockMu.Lock()
sk.conn = c
h.sockMu.Unlock()
return c, nil
}
// ---- conn ----
// shutdown closes underlying network connection.
func (c *conn) shutdown() {
c.downOnce.Do(func() {
atomic.StoreUint32(&c.down, 1)
c.errClose = c.Conn.Close()
})
}
// Close closes network endpoint and unregisters conn from Host.
//
// All currently in-flight blocked IO is interrupted with an error.
func (c *conn) Close() error {
c.shutdown()
c.closeOnce.Do(func() {
sk := c.socket
h := sk.host
h.sockMu.Lock()
defer h.sockMu.Unlock()
sk.conn = nil
if sk.empty() {
h.socketv[sk.port] = nil
}
})
return c.errClose
}
// Read implements net.Conn .
//
// it delegates the read to underlying net.Conn but amends error if it was due
// to conn shutdown.
func (c *conn) Read(p []byte) (int, error) {
n, err := c.Conn.Read(p)
if err != nil && err != io.EOF {
if !errIsTimeout(err) {
// an error that might be due to shutdown
err = c.errOrDown(err)
}
// wrap error to be at virtnet level.
// net.OpError preserve .Timeout() value if .Err has it.
err = &net.OpError{
Op: "read",
Net: c.socket.host.Network(),
Addr: c.LocalAddr(),
Source: c.RemoteAddr(),
Err: err,
}
}
return n, err
}
// Write implements net.Conn .
//
// it delegates the write to underlying net.Conn but amends error if it was due
// to conn shutdown.
func (c *conn) Write(p []byte) (int, error) {
n, err := c.Conn.Write(p)
if err != nil {
if !errIsTimeout(err) {
err = c.errOrDown(err)
}
err = &net.OpError{
Op: "write",
Net: c.socket.host.Network(),
Addr: c.RemoteAddr(),
Source: c.LocalAddr(),
Err: err,
}
}
return n, err
}
// LocalAddr implements net.Conn.
//
// it returns virtnet address of local end of connection.
func (c *conn) LocalAddr() net.Addr {
return c.socket.addr()
}
// RemoteAddr implements net.Conn .
//
// it returns virtnet address of remote end of connection.
func (c *conn) RemoteAddr() net.Addr {
return c.peerAddr
}
// ----------------------------------------
// allocFreeSocket finds first free port and allocates socket entry for it.
//
// must be called with SubNetwork.mu held.
func (h *Host) allocFreeSocket() *socket {
// find first free port
port := 1 // never allocate port 0 - it is used for autobind on listen only
for ; port < len(h.socketv); port++ {
if h.socketv[port] == nil {
break
}
}
// if all busy it exits with port >= len(h.socketv)
// grow if needed
for port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
}
sk := &socket{host: h, port: port}
h.socketv[port] = sk
return sk
}
// empty checks whether socket's both conn and listener are all nil.
func (sk *socket) empty() bool {
return sk.conn == nil && sk.listener == nil
}
// addr returns address corresponding to socket.
func (sk *socket) addr() *Addr {
h := sk.host
return &Addr{Net: h.Network(), Host: h.name, Port: sk.port}
}
// Network implements net.Addr .
func (a *Addr) Network() string { return a.Net }
// String implements net.Addr .
func (a *Addr) String() string { return net.JoinHostPort(a.Host, strconv.Itoa(a.Port)) }
// ParseAddr parses addr into virtnet address for named network.
func ParseAddr(network, addr string) (*Addr, error) {
host, portstr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
port, err := strconv.Atoi(portstr)
if err != nil || port < 0 {
return nil, &net.AddrError{Err: "invalid port", Addr: addr}
}
return &Addr{Net: network, Host: host, Port: port}, nil
}
// parseAddr parses addr into virtnet address from host point of view.
//
// It is the same as ParseAddr except empty host string - e.g. as in ":0" -
// is resolved to the host itself.
func (h *Host) parseAddr(addr string) (*Addr, error) {
a, err := ParseAddr(h.Network(), addr)
if err != nil {
return nil, err
}
// local host if host name omitted
if a.Host == "" {
a.Host = h.name
}
return a, nil
}
// Addr returns address where listener is accepting incoming connections.
func (l *listener) Addr() net.Addr {
return l.socket.addr()
}
// Network returns full network name this subnetwork is part of.
func (n *SubNetwork) Network() string { return n.network }
// Network returns full network name of underlying network.
func (h *Host) Network() string { return h.subnet.Network() }
// Name returns host name.
func (h *Host) Name() string { return h.name }
// ----------------------------------------
// errDown returns appropriate error cause when h.down is found ready.
func (h *Host) errDown() error {
switch {
case ready(h.subnet.down):
return ErrNetDown
default:
return ErrHostDown
}
}
// errDown returns appropriate error cause when l.down is found ready.
func (l *listener) errDown() error {
h := l.socket.host
n := h.subnet
switch {
case ready(n.down):
return ErrNetDown
case ready(h.down):
return ErrHostDown
default:
return ErrSockDown
}
}
// errOrDown returns err or shutdown cause if c.shutdown was called.
func (c *conn) errOrDown(err error) error {
// shutdown was not yet called - leave it as is.
if atomic.LoadUint32(&c.down) == 0 {
return err
}
// shutdown was called - find out the reason.
h := c.socket.host
n := h.subnet
switch {
case ready(n.down):
return ErrNetDown
case ready(h.down):
return ErrHostDown
default:
return ErrSockDown
}
}
// ready returns whether chan struct{} is ready.
func ready(ch <-chan struct{}) bool {
select {
case <-ch:
return true
default:
return false
}
}
// errIsTimeout checks whether error is due to timeout.
//
// useful to check because net.Conn says:
//
// "Read/Write can be made to time out and return an Error with Timeout() == true"
func errIsTimeout(err error) bool {
e, ok := err.(interface{ Timeout() bool })
if ok {
return e.Timeout()
}
return false
}
// Copyright (C) 2018 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
package virtnet_test
import (
"context"
"io"
"net"
"strings"
"testing"
"time"
"golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/internal/xtesting"
"lab.nexedi.com/kirr/go123/xnet/pipenet"
. "lab.nexedi.com/kirr/go123/xnet/virtnet"
"github.com/pkg/errors"
)
// testNet is testing network environment.
//
// It consists of a subnetwork backed by pipenet with 2 hosts: hα and hβ. On
// both hosts a listener is started at "" (i.e. it will have ":1" address).
// There is a connection established in between α:2-β:2.
type testNet struct {
testing.TB
net *SubNetwork
, *Host
, net.Listener
cαβ, cβα net.Conn
}
// newTestNet creates new testing network environment.
func newTestNet(t0 testing.TB) *testNet {
t := &testNet{TB: t0}
t.Helper()
var err error
t.net = pipenet.AsVirtNet(pipenet.New("t"))
t., err = t.net.NewHost(context.Background(), "α")
if err != nil {
t.Fatal(err)
}
t., err = t.net.NewHost(context.Background(), "β")
if err != nil {
t.Fatal(err)
}
t., err = t..Listen("")
if err != nil {
t.Fatal(err)
}
t., err = t..Listen("")
if err != nil {
t.Fatal(err)
}
// preestablish α:2-β:2 connection
wg := &errgroup.Group{}
defer func() {
err := wg.Wait()
if err != nil {
t.Fatal(err)
}
}()
wg.Go(func() error {
c, err := t..Accept()
if err != nil {
return err
}
t.cβα = c
return nil
})
c, err := t..Dial(context.Background(), "β:1")
if err != nil {
t.Fatal(err)
}
t.cαβ = c
return t
}
// xneterr constructs net.OpError for testNet network.
//
// if addr is of form "α:1" - only .Addr is set.
// if addr is of form "α:1->β:1" - both .Source and .Addr are set.
func xneterr(op, addr string, err error) *net.OpError {
addrv := strings.Split(addr, "->")
if len(addrv) > 2 {
exc.Raisef("xneterr: invalid addr %q", addr)
}
operr := &net.OpError{
Op: op,
Net: "pipet", // matches newTestNet
Err: err,
}
for i, addr := range addrv {
a, e := ParseAddr("pipet", addr)
exc.Raiseif(e)
if i == len(addrv)-1 {
operr.Addr = a
} else {
operr.Source = a
}
}
return operr
}
// xobject lookups testNet object by name.
func (t *testNet) xobject(name string) io.Closer {
switch name {
case "subnet": return t.net
case "hα": return t.
case "hβ": return t.
case "lα": return t.
case "lβ": return t.
case "cαβ": return t.cαβ
case "cβα": return t.cβα
}
exc.Raisef("invalid object: %q", name)
panic(0)
}
type testFlag int
const serialOnly testFlag = 1
// testClose verifies object.Close vs test func.
//
// object to close is specified by name, e.g. "hβ". test func should try to do
// an action and verify it gets expected error given object is closed.
//
// two scenarios are verified:
//
// - serial case: first close, then test, and
// - concurrent case: close is run in parallel to test.
//
// if concurrent case is not applicable for test (e.g. it tries to run a
// function that does not block, like e.g. NewHost in pipenet case), it can be
// disabled via passing optional serialOnly flag.
func testClose(t0 testing.TB, object string, test func(*testNet), flagv ...testFlag) {
t0.Helper()
// serial case
t := newTestNet(t0)
obj := t.xobject(object)
err := obj.Close()
if err != nil {
t.Fatal(err)
}
test(t)
if len(flagv) > 0 && flagv[0] == serialOnly {
return
}
// concurrent case
t = newTestNet(t0)
obj = t.xobject(object)
wg := &errgroup.Group{}
wg.Go(func() error {
tdelay()
return obj.Close()
})
test(t)
err = wg.Wait()
if err != nil {
t.Fatal(err)
}
}
// tdelay delays a bit.
//
// needed e.g. to test Close interaction with waiting read or write
// (we cannot easily sync and make sure e.g. read is started and became asleep)
func tdelay() {
time.Sleep(1 * time.Millisecond)
}
// TestClose verifies that for all virtnet objects Close properly interrupt /
// errors all corresponding operations.
func TestClose(t *testing.T) {
bg := context.Background()
assert := xtesting.Assert(t)
// Subnet Host listener conn
// NewHost x
// Dial x x x
// Listen x x
// Accept x x x
// Read/Write x x x
// ---- NewHost ----
// subnet.NewHost vs subnet.Close
testClose(t, "subnet", func(t *testNet) {
h, err := t.net.NewHost(bg, "γ")
assert.Eq(h, (*Host)(nil))
assert.Eq(errors.Cause(err), ErrNetDown)
assert.Eq(err.Error(), "virtnet \"pipet\": new host \"γ\": network is down")
}, serialOnly)
// ---- Dial ----
// host.Dial vs subnet.Close
testClose(t, "subnet", func(t *testNet) {
c, err := t..Dial(bg, "β:1")
assert.Eq(c, nil)
assert.Eq(err, xneterr("dial", "α:3->β:1", ErrNetDown))
})
// host1.Dial vs host1.Close
testClose(t, "hα", func(t *testNet) {
c, err := t..Dial(bg, "β:1")
assert.Eq(c, nil)
assert.Eq(err, xneterr("dial", "α:3->β:1", ErrHostDown))
})
// host1.Dial vs host2.Close
testClose(t, "hβ", func(t *testNet) {
c, err := t..Dial(bg, "β:1")
assert.Eq(c, nil)
assert.Eq(err, xneterr("dial", "α:3->β:1", ErrConnRefused))
})
// host1.Dial vs host2.listener.Close
testClose(t, "lβ", func(t *testNet) {
c, err := t..Dial(bg, "β:1")
assert.Eq(c, nil)
assert.Eq(err, xneterr("dial", "α:3->β:1", ErrConnRefused))
})
// ---- Listen ----
// host.Listen vs subnet.Close
testClose(t, "subnet", func(t *testNet) {
l, err := t..Listen("")
assert.Eq(l, nil)
assert.Eq(err, xneterr("listen", "α:0", ErrNetDown))
}, serialOnly)
// host.Listen vs host.Close
testClose(t, "hα", func(t *testNet) {
l, err := t..Listen("")
assert.Eq(l, nil)
assert.Eq(err, xneterr("listen", "α:0", ErrHostDown))
}, serialOnly)
// ---- Accept ----
// listener.Accept vs subnet.Close
testClose(t, "subnet", func(t *testNet) {
c, err := t..Accept()
assert.Eq(c, nil)
assert.Eq(err, xneterr("accept", "α:1", ErrNetDown))
})
// listener.Accept vs host.Close
testClose(t, "hα", func(t *testNet) {
c, err := t..Accept()
assert.Eq(c, nil)
assert.Eq(err, xneterr("accept", "α:1", ErrHostDown))
})
// listener.Accept vs listener.Close
testClose(t, "lα", func(t *testNet) {
c, err := t..Accept()
assert.Eq(c, nil)
assert.Eq(err, xneterr("accept", "α:1", ErrSockDown))
})
// ---- Read/Write ----
buf := []byte("hello world!")
// conn.{Read,Write} vs subnet.Close
testClose(t, "subnet", func(t *testNet) {
n, err := t.cαβ.Read(buf)
assert.Eq(n, 0)
// err can be also EOF because subnet.Close closes cβα too and
// depending on scheduling we might first get EOF on our end.
if err != io.EOF {
assert.Eq(err, xneterr("read", "β:2->α:2", ErrNetDown))
}
})
testClose(t, "subnet", func(t *testNet) {
n, err := t.cαβ.Write(buf)
assert.Eq(n, 0)
assert.Eq(err, xneterr("write", "α:2->β:2", ErrNetDown))
})
// conn1.{Read,Write} vs host1.Close
testClose(t, "hα", func(t *testNet) {
n, err := t.cαβ.Read(buf)
assert.Eq(n, 0)
assert.Eq(err, xneterr("read", "β:2->α:2", ErrHostDown))
})
testClose(t, "hα", func(t *testNet) {
n, err := t.cαβ.Write(buf)
assert.Eq(n, 0)
assert.Eq(err, xneterr("write", "α:2->β:2", ErrHostDown))
})
// conn1.{Read,Write} vs host2.Close
testClose(t, "hβ", func(t *testNet) {
n, err := t.cαβ.Read(buf)
assert.Eq(n, 0)
assert.Eq(err, io.EOF)
})
testClose(t, "hβ", func(t *testNet) {
n, err := t.cαβ.Write(buf)
assert.Eq(n, 0)
assert.Eq(err, xneterr("write", "α:2->β:2", io.ErrClosedPipe))
})
// conn1.{Read,Write} vs conn1.Close
testClose(t, "cαβ", func(t *testNet) {
n, err := t.cαβ.Read(buf)
assert.Eq(n, 0)
assert.Eq(err, xneterr("read", "β:2->α:2", ErrSockDown))
})
testClose(t, "cαβ", func(t *testNet) {
n, err := t.cαβ.Write(buf)
assert.Eq(n, 0)
assert.Eq(err, xneterr("write", "α:2->β:2", ErrSockDown))
})
// conn1.{Read,Write} vs conn2.Close
testClose(t, "cβα", func(t *testNet) {
n, err := t.cαβ.Read(buf)
assert.Eq(n, 0)
assert.Eq(err, io.EOF)
})
testClose(t, "cβα", func(t *testNet) {
n, err := t.cαβ.Write(buf)
assert.Eq(n, 0)
assert.Eq(err, xneterr("write", "α:2->β:2", io.ErrClosedPipe))
})
}
// TestVNetDown verifies that engine shutdown error signal is properly handled.
func TestVNetDown(t0 *testing.T) {
assert := xtesting.Assert(t0)
t := newTestNet(t0)
errSomeProblem := errors.New("some problem")
SubnetShutdown(t.net, errSomeProblem) // notifier.VNetDown does this
// SubNetwork.Close = shutdown(nil) and all that interactions were
// verified in TestClose. Here lets check only that we get proper Close error.
err := t.net.Close()
assert.Eq(errors.Cause(err), errSomeProblem)
assert.Eq(err.Error(), "virtnet \"pipet\": close: some problem")
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment