Commit 341b2124 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b78c529a
......@@ -117,7 +117,12 @@ func (tc *TraceChecker) ExpectNetTx(src, dst string, pkt string) {
func TestMasterStorage(t *testing.T) {
tracer := &MyTracer{xtesting.NewSyncTracer()}
tc := xtesting.NewTraceChecker(t, tracer.SyncTracer)
net := xnet.NetTrace(pipenet.New(""), tracer) // test network
//net := xnet.NetTrace(pipenet.New(""), tracer) // test network
net := pipenet.New("testnet") // test network
Mhost := xnet.NetTrace(net.Host("m"), tracer)
Shost := xnet.NetTrace(net.Host("s"), tracer)
Maddr := "0"
Saddr := "1"
......@@ -125,7 +130,7 @@ func TestMasterStorage(t *testing.T) {
wg := &xsync.WorkGroup{}
// start master
M := NewMaster("abc1", Maddr, net)
M := NewMaster("abc1", Maddr, Mhost)
Mctx, Mcancel := context.WithCancel(context.Background())
wg.Gox(func() {
err := M.Run(Mctx)
......@@ -158,6 +163,9 @@ func TestMasterStorage(t *testing.T) {
//tc.Expect(nettx("2c", "2s", "\x00\x00\x00\x01"))
// handshake
tc.Expect(nettx("s:1", "m:1", "\x00\x00\x00\x01"))
tc.Expect(nettx("m:1", "s:1", "\x00\x00\x00\x01"))
//tc.ExpectNetTx("2c", "2s", "\x00\x00\x00\x01") // handshake
//tc.ExpectNetTx("2s", "2c", "\x00\x00\x00\x01")
tc.ExpectPar(
......
......@@ -30,11 +30,13 @@ type Networker interface {
// Network returns name of the network XXX recheck
Network() string
// XXX +Addr() net.Addr -> address of this access-point on underlying network ?
// Dial connects to addr on underlying network
// see net.Dial for semantic details
Dial(ctx context.Context, addr string) (net.Conn, error)
// Listen starts listening on local address laddr on underlying network
// Listen starts listening on local address laddr on underlying network access-point
// see net.Listen for semantic details
//
// XXX also introduce xnet.Listener in which Accept() accepts also ctx?
......
......@@ -15,26 +15,23 @@
//
// See COPYING file for full licensing terms.
// Package pipenet provides synchronous in-memory network of net.Pipes
// Package pipenet provides TCP-like synchronous in-memory network of net.Pipes
//
// It can be worked with the same way a regular TCP network is used with
// Dial/Listen/Accept/...
//
// Addresses on pipenet are numbers, indicating serial number of a pipe
// used, plus "c"/"s" suffix depending on whether pipe endpoint was created via
// Dial or Accept.
//
// Address of a listener is just number, which will be in turn used for
// corresponding Dial/Accept as prefix.
// Addresses on pipenet are host:port pairs. A host is xnet.Networker and so
// can be worked with similarly to regular TCP network with Dial/Listen/Accept/...
//
// Example:
//
// XXX adjust
// net := pipenet.New("")
// l, err := net.Listen("10") // starts listening on address "10"
// h1 := net.Host("abc")
// h2 := net.Host("def")
//
// l, err := h1.Listen(":10") // starts listening on address "abc:10"
// go func() {
// csrv, err := l.Accept() // csrv will have LocalAddr "10s"
// csrv, err := l.Accept() // csrv will have LocalAddr "abc:10"
// }()
// ccli, err := net.Dial("10") // ccli will have LocalAddr "10c"
// ccli, err := h2.Dial("abc:10") // ccli will have RemoteAddr "def:10"
//
// Pipenet might be handy for testing interaction of networked applications in 1
// process without going to OS networking stack.
......@@ -54,42 +51,53 @@ const NetPrefix = "pipe" // pipenet package creates only "pipe*" networks
var (
errNetClosed = errors.New("network connection closed")
errAddrAlreadyUsed = errors.New("address already in use")
errAddrNoListen = errors.New("cannot listen on requested address")
errConnRefused = errors.New("connection refused")
)
// Addr represents address of a pipenet endpoint
type Addr struct {
Net string // full network name, e.g. "pipe"
Port int // -1, if anonymous
Endpoint int // 0 (client) | 1 (server) | -1 (listening)
//Addr string // port + c/s depending on connection endpoint
Host string // name of host access point on the network
Port int // port on host XXX -1, if anonymous ?
}
// Network implements synchronous in-memory network of pipes
// XXX text about hosts & ports and routing logic
type Network struct {
// name of this network under "pipe" namespace -> e.g. ""
// full network name will be reported as "pipe"+name
name string
// big network lock for everything dynamic under Network
// (e.g. Host.socketv too)
mu sync.Mutex
entryv []*entry // port -> listener | (conn, conn)
hostMap map[string]*Host
}
// Host represents named access point on Network
type Host struct {
network *Network
name string
// NOTE protected by Network.mu
socketv []*socket // port -> listener | conn
}
// entry represents one Network entry
// it can be either already connected (2 endpoints) or only listening (1 endpoint)
// anything from the above becomes nil when closed
type entry struct {
network *Network
port int
// socket represents one endpoint entry on Network
// it can be either already connected or listening
type socket struct {
host *Host // host/port this socket is binded to
port int
pipev [2]*conn // connection endpoints are there if != nil
conn *conn // connection endpoint is here if != nil
listener *listener // listener is waiting here if != nil
}
// conn represents one endpoint of connection created under Network
type conn struct {
entry *entry
endpoint int // 0 | 1 -> entry.pipev
socket *socket
net.Conn
......@@ -98,15 +106,21 @@ type conn struct {
// listener implements net.Listener for piped network
type listener struct {
// network/port we are listening on
entry *entry
// network/host/port we are listening on
socket *socket
dialq chan chan net.Conn // Dial requests to our port go here
down chan struct{} // Close -> down=ready
dialq chan dialReq // Dial requests to our port go here
down chan struct{} // Close -> down=ready
closeOnce sync.Once
}
// dialReq represents one dial request to listener
type dialReq struct {
from *Host
resp chan net.Conn
}
// ----------------------------------------
// New creates new pipenet Network
......@@ -117,58 +131,97 @@ func New(name string) *Network {
return &Network{name: name}
}
// Host returns network access point by name
// if there was no such host it creates new one
func (n *Network) Host(name string) *Host {
n.mu.Lock()
defer n.mu.Unlock()
host := n.hostMap[name]
if host == nil {
host = &Host{network: n, name: name}
n.hostMap[name] = host
}
return host
}
// resolveAddr resolved addr on the network from the host point of view
// must be called with Network.mu held
func (h *Host) resolveAddr(addr string) (host *Host, port int, err error) {
hoststr, portstr, err := net.SplitHostPort(addr)
if err != nil {
return nil, 0, err
}
port, err = strconv.Atoi(portstr)
if err != nil || port < 0 {
return nil, 0, &net.AddrError{Err: "invalid", Addr: addr}
}
if hoststr == "" {
hoststr = h.name
}
host = h.network.hostMap[hoststr]
if host == nil {
return nil, 0, &net.AddrError{Err: "no such host", Addr: addr}
}
return host, port, nil
}
// Listen starts new listener
// It either allocates free port if laddr is "", or binds to laddr.
// Once listener is started, Dials could connect to listening address.
// Connection requests created by Dials could be accepted via Accept.
func (n *Network) Listen(laddr string) (net.Listener, error) {
var netladdr net.Addr
lerr := func(err error) error {
return &net.OpError{Op: "listen", Net: n.Network(), Addr: netladdr, Err: err}
}
func (h *Host) Listen(laddr string) (net.Listener, error) {
h.network.mu.Lock()
defer h.network.mu.Unlock()
// laddr must be empty or int >= 0
port := -1
if laddr != "" {
port, err := strconv.Atoi(laddr)
if err != nil || port < 0 {
return nil, lerr(&net.AddrError{Err: "invalid", Addr: laddr})
}
}
var sk *socket
netladdr = &Addr{n.Network(), port, -1}
// find first free port if autobind requested
if laddr == "" {
sk = h.allocFreeSocket()
n.mu.Lock()
defer n.mu.Unlock()
// else we resolve/checr address, whether it is already used and if not allocate socket in-place
} else {
var netladdr net.Addr
lerr := func(err error) error {
return &net.OpError{Op: "listen", Net: h.Network(), Addr: netladdr, Err: err}
}
var e *entry
host, port, err := h.resolveAddr(laddr)
if err != nil {
return nil, lerr(err)
}
// find first free port if it was not specified
if port < 0 {
e = n.allocFreeEntry()
netladdr = &Addr{Net: h.Network(), Host: host.name, Port: port}
if host != h {
return nil, lerr(errAddrNoListen)
}
// else we check whether address is already used and if not allocate entry in-place
} else {
// grow if needed
for port >= len(n.entryv) {
n.entryv = append(n.entryv, nil)
for port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
}
if n.entryv[port] != nil {
if h.socketv[port] != nil {
return nil, lerr(errAddrAlreadyUsed)
}
e = &entry{network: n, port: port}
n.entryv[port] = e
sk = &socket{host: h, port: port}
h.socketv[port] = sk
}
// create listener under entry
// create listener under socket
l := &listener{
entry: e,
dialq: make(chan chan net.Conn),
socket: sk,
dialq: make(chan dialReq),
down: make(chan struct{}),
}
e.listener = l
sk.listener = l
return l, nil
}
......@@ -179,15 +232,16 @@ func (l *listener) Close() error {
l.closeOnce.Do(func() {
close(l.down)
e := l.entry
n := e.network
sk := l.socket
h := sk.host
n := h.network
n.mu.Lock()
defer n.mu.Unlock()
e.listener = nil
if e.empty() {
n.entryv[e.port] = nil
sk.listener = nil
if sk.empty() {
h.socketv[sk.port] = nil
}
})
return nil
......@@ -195,58 +249,62 @@ func (l *listener) Close() error {
// Accept tries to connect to Dial called with addr corresponding to our listener
func (l *listener) Accept() (net.Conn, error) {
n := l.entry.network
h := l.socket.host
n := h.network
select {
case <-l.down:
return nil, &net.OpError{Op: "accept", Net: n.Network(), Addr: l.Addr(), Err: errNetClosed}
return nil, &net.OpError{Op: "accept", Net: h.Network(), Addr: l.Addr(), Err: errNetClosed}
case resp := <-l.dialq:
case req := <-l.dialq:
// someone dialed us - let's connect
pc, ps := net.Pipe()
// allocate entry and register conns to Network under it
// allocate sockets and register conns to Network under them
n.mu.Lock()
e := n.allocFreeEntry()
e.pipev[0] = &conn{entry: e, endpoint: 0, Conn: pc}
e.pipev[1] = &conn{entry: e, endpoint: 1, Conn: ps}
skc := req.from.allocFreeSocket()
sks := h.allocFreeSocket()
skc.conn = &conn{socket: skc, Conn: pc}
sks.conn = &conn{socket: sks, Conn: ps}
n.mu.Unlock()
resp <- e.pipev[0]
return e.pipev[1], nil
req.resp <- skc.conn
return sks.conn, nil
}
}
// Dial dials address on the network
// It tries to connect to Accept called on listener corresponding to addr.
func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) {
func (h *Host) Dial(ctx context.Context, addr string) (net.Conn, error) {
var netaddr net.Addr
derr := func(err error) error {
return &net.OpError{Op: "dial", Net: n.Network(), Addr: netaddr, Err: err}
return &net.OpError{Op: "dial", Net: h.Network(), Addr: netaddr, Err: err}
}
port, err := strconv.Atoi(addr)
if err != nil || port < 0 {
return nil, derr(&net.AddrError{Err: "invalid", Addr: addr})
}
n := h.network
n.mu.Lock()
netaddr = &Addr{n.Network(), port, -1}
host, port, err := h.resolveAddr(addr)
if err != nil {
n.mu.Unlock()
return nil, derr(err)
}
n.mu.Lock()
netaddr = &Addr{Net: h.Network(), Host: host.name, Port: port}
if port >= len(n.entryv) {
if port >= len(host.socketv) {
n.mu.Unlock()
return nil, derr(errConnRefused)
}
e := n.entryv[port]
if e == nil || e.listener == nil {
sks := host.socketv[port]
if sks == nil || sks.listener == nil {
n.mu.Unlock()
return nil, derr(errConnRefused)
}
l := e.listener
l := sks.listener
// NOTE Accept is locking n.mu -> we must release n.mu before sending dial request
n.mu.Unlock()
......@@ -259,7 +317,7 @@ func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) {
case <-l.down:
return nil, derr(errConnRefused)
case l.dialq <- resp:
case l.dialq <- dialReq{from: h, resp: resp}:
return <-resp, nil
}
}
......@@ -270,16 +328,17 @@ func (c *conn) Close() (err error) {
c.closeOnce.Do(func() {
err = c.Conn.Close()
e := c.entry
n := e.network
sk := c.socket
h := sk.host
n := h.network
n.mu.Lock()
defer n.mu.Unlock()
e.pipev[c.endpoint] = nil
h.socketv[sk.port] = nil
if e.empty() {
n.entryv[e.port] = nil
if sk.empty() {
h.socketv[sk.port] = nil
}
})
......@@ -287,71 +346,61 @@ func (c *conn) Close() (err error) {
}
// LocalAddr returns address of local end of connection
// it is entry address + "c" (client) or "s" (server) suffix depending on
// whether pipe endpoint was created via Dial or Accept.
func (c *conn) LocalAddr() net.Addr {
addr := c.entry.addr()
addr.Endpoint = c.endpoint
return addr
return c.socket.addr()
}
// RemoteAddr returns address of remote end of connection
// it is entry address + "c" or "s" suffix -- see LocalAddr for details
func (c *conn) RemoteAddr() net.Addr {
addr := c.entry.addr()
addr.Endpoint = (c.endpoint + 1) % 2
return addr
return c.socket.addr() // FIXME -> must be addr of remote socket
}
// ----------------------------------------
// allocFreeEntry finds first free port and allocates network entry for it
// must be called with .mu held
func (n *Network) allocFreeEntry() *entry {
// find first free port if it was not specified
// allocFreeSocket finds first free port and allocates socket entry for it
// must be called with Network.mu held
func (h *Host) allocFreeSocket() *socket {
// find first free port
port := 0
for ; port < len(n.entryv); port++ {
if n.entryv[port] == nil {
for ; port < len(h.socketv); port++ {
if h.socketv[port] == nil {
break
}
}
// if all busy it exits with port == len(n.entryv)
// if all busy it exits with port == len(h.socketv)
// grow if needed
for port >= len(n.entryv) {
n.entryv = append(n.entryv, nil)
for port >= len(h.socketv) {
h.socketv = append(h.socketv, nil)
}
e := &entry{network: n, port: port}
n.entryv[port] = e
return e
sk := &socket{host: h, port: port}
h.socketv[port] = sk
return sk
}
// empty checks whether entry's both 2 pipe endpoints and listener are all nil
func (e *entry) empty() bool {
return e.pipev[0] == nil && e.pipev[1] == nil && e.listener == nil
// empty checks whether sockets's both pipe endpoint and listener are all nil
func (sk *socket) empty() bool {
return sk.conn == nil && sk.listener == nil
}
// addr returns address corresponding to entry
func (e *entry) addr() *Addr {
return &Addr{Net: e.network.Network(), Port: e.port, Endpoint: -1}
func (sk *socket) addr() *Addr {
h := sk.host
return &Addr{Net: h.Network(), Host: h.name, Port: sk.port}
}
func (a *Addr) Network() string { return a.Net }
func (a *Addr) String() string {
addr := strconv.Itoa(a.Port)
if a.Endpoint >= 0 {
addr += string("cs"[a.Endpoint])
}
return addr
}
func (a *Addr) String() string { return net.JoinHostPort(a.Host, strconv.Itoa(a.Port)) }
// Addr returns address where listener is accepting incoming connections
func (l *listener) Addr() net.Addr {
// NOTE no +"l" suffix e.g. because Dial(l.Addr()) must work
return l.entry.addr()
return l.socket.addr()
}
// Network returns full network name of this network
func (n *Network) Network() string { return NetPrefix + n.name }
// Network returns full network name of underllying network
func (h *Host) Network() string { return h.network.Network() }
......@@ -34,6 +34,8 @@ import (
// only Tx events are traced:
// - because Write, contrary to Read, never writes partial data on non-error
// - because in case of pipenet tracing writes only is enough to get whole network exchange picture
//
// XXX Dial/Listen are also traced
func NetTrace(inner Networker, tracer Tracer) Networker {
return &netTrace{inner, tracer}
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment