Commit 9e223d74 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c867321f
...@@ -63,7 +63,7 @@ func (n netPlain) Listen(laddr string) (net.Listener, error) { ...@@ -63,7 +63,7 @@ func (n netPlain) Listen(laddr string) (net.Listener, error) {
} }
// NetPipe creates Network corresponding to in-memory pipenet // NetPipe creates Network corresponding to in-memory pipenet
// name is anything valid according to pipenet.New rules // name is passed directly to pipenet.New
func NetPipe(name string) Network { func NetPipe(name string) Network {
return pipenet.New(name) return pipenet.New(name)
} }
...@@ -98,32 +98,18 @@ func (n *netTLS) Listen(laddr string) (net.Listener, error) { ...@@ -98,32 +98,18 @@ func (n *netTLS) Listen(laddr string) (net.Listener, error) {
// ---------------------------------------- // ----------------------------------------
// String formats Address to networked address string // Addr converts net.Addr into NEO Address
func (addr Address) String() string { // TODO make neo.Address just string without host:port split
// XXX in py if .Host == "" -> whole Address is assumed to be empty func Addr(addr net.Addr) (Address, error) {
// e.g. on unix, pipenet, etc there is no host/port split - the address
// is single string which we put into .Host and set .Port=0
switch addr.Port {
case 0:
return addr.Host
default:
return net.JoinHostPort(addr.Host, fmt.Sprintf("%d", addr.Port))
}
}
// ParseAddress parses networked address (XXX of form host:port) into NEO Address
//func ParseAddress(addr string) (Address, error) {
func ConvAddress(addr net.Addr) (Address, error) {
addrstr := addr.String() addrstr := addr.String()
// e.g. on unix, pipenet, etc networks there is no host/port split - the address // e.g. on unix, pipenet, etc networks there is no host/port split - the address there
// is single string which we put into .Host and set .Port=0 to indicate such cases // is single string -> we put it into .Host and set .Port=0 to indicate such cases
switch addr.Network() { switch addr.Network() {
default: default:
return Address{Host: addrstr, Port: 0}, nil return Address{Host: addrstr, Port: 0}, nil
// networks that have host:port split
case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6": case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6":
host, portstr, err := net.SplitHostPort(addrstr) host, portstr, err := net.SplitHostPort(addrstr)
if err != nil { if err != nil {
...@@ -138,3 +124,17 @@ func ConvAddress(addr net.Addr) (Address, error) { ...@@ -138,3 +124,17 @@ func ConvAddress(addr net.Addr) (Address, error) {
return Address{Host: host, Port: uint16(port)}, nil return Address{Host: host, Port: uint16(port)}, nil
} }
} }
// String formats Address to networked address string
func (addr Address) String() string {
// XXX in py if .Host == "" -> whole Address is assumed to be empty
// see Addr ^^^ about .Port=0 meaning no host:port split was applied
switch addr.Port {
case 0:
return addr.Host
default:
return net.JoinHostPort(addr.Host, fmt.Sprintf("%d", addr.Port))
}
}
...@@ -81,7 +81,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -81,7 +81,7 @@ func (stor *Storage) Run(ctx context.Context) error {
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and // NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234 // listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty // -> host is never empty
addr, err := neo.ParseAddress(l.Addr().String()) addr, err := neo.Addr(l.Addr())
if err != nil { if err != nil {
// XXX -> panic here ? // XXX -> panic here ?
return err // XXX err ctx return err // XXX err ctx
......
...@@ -17,9 +17,26 @@ ...@@ -17,9 +17,26 @@
// Package pipenet provides synchronous in-memory network of net.Pipes // Package pipenet provides synchronous in-memory network of net.Pipes
// //
// TODO describe addressing scheme // It can be worked with the same way a regular TCP network is used with
// Dial/Listen/Accept/...
// //
// it might be handy for testing interaction in networked applications in 1 // Addresses on pipenet are numbers, indicating serial number of a pipe
// used, plus "c"/"s" suffix depending on whether pipe endpoint was created via
// Dial or Accept.
//
// Address of a listener is just number, which will be in turn used for
// corresponding Dial/Accept as prefix.
//
// Example:
//
// net := pipenet.New("")
// l, err := net.Listen("10") // starts listening on address "10"
// go func() {
// csrv, err := l.Accept() // csrv will have LocalAddr "10s"
// }()
// ccli, err := net.Dial("10") // ccli will have LocalAddr "10c"
//
// Pipenet might be handy for testing interaction of networked applications in 1
// process without going to OS networking stack. // process without going to OS networking stack.
package pipenet package pipenet
...@@ -29,16 +46,13 @@ import ( ...@@ -29,16 +46,13 @@ import (
"fmt" "fmt"
"net" "net"
"strconv" "strconv"
"strings"
"sync" "sync"
) )
const NetPrefix = "pipe" // pipenet package works only with "pipe*" networks const NetPrefix = "pipe" // pipenet package creates only "pipe*" networks
var ( var (
// errBadNetwork = errors.New("pipenet: invalid network")
errBadAddress = errors.New("invalid address") errBadAddress = errors.New("invalid address")
// errNetNotFound = errors.New("no such network")
errNetClosed = errors.New("network connection closed") errNetClosed = errors.New("network connection closed")
errAddrAlreadyUsed = errors.New("address already in use") errAddrAlreadyUsed = errors.New("address already in use")
errConnRefused = errors.New("connection refused") errConnRefused = errors.New("connection refused")
...@@ -51,11 +65,10 @@ type Addr struct { ...@@ -51,11 +65,10 @@ type Addr struct {
} }
// Network implements synchronous in-memory network of pipes // Network implements synchronous in-memory network of pipes
// It can be worked with the same way a regular TCP network is handled with Dial/Listen/Accept/...
type Network struct { type Network struct {
// name of this network under "pipe" namespace -> e.g. "" // name of this network under "pipe" namespace -> e.g. ""
// full network name will be reported as "pipe"+Name XXX -> just full name ? // full network name will be reported as "pipe"+name
Name string name string
mu sync.Mutex mu sync.Mutex
entryv []*entry // port -> listener | (conn, conn) entryv []*entry // port -> listener | (conn, conn)
...@@ -93,71 +106,23 @@ type listener struct { ...@@ -93,71 +106,23 @@ type listener struct {
closeOnce sync.Once closeOnce sync.Once
} }
// ----------------------------------------
// allocFreeEntry finds first free port and allocates network entry for it // New creates new pipenet Network
// must be called with .mu held // name is name of this network under "pipe" namespace, e.g. "α" will give full network name "pipeα".
func (n *Network) allocFreeEntry() *entry { //
// find first free port if it was not specified // New does not check whether network name provided is unique.
port := 0 func New(name string) *Network {
for ; port < len(n.entryv); port++ { return &Network{name: name}
if n.entryv[port] == nil {
break
}
}
// if all busy it exits with port == len(n.entryv)
// grow if needed
for port >= len(n.entryv) {
n.entryv = append(n.entryv, nil)
}
e := &entry{network: n, port: port}
n.entryv[port] = e
return e
}
// empty checks whether both 2 pipe endpoints and listener are nil
func (e *entry) empty() bool {
return e.pipev[0] == nil && e.pipev[1] == nil && e.listener == nil
}
// addr returns address corresponding to entry
func (e *entry) addr() *Addr {
return &Addr{network: e.network.netname(), addr: fmt.Sprintf("%d", e.port)}
}
func (a *Addr) Network() string { return a.network }
func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr ?
func (n *Network) netname() string { return NetPrefix + n.Name }
// Close closes the listener
// it interrupts all currently in-flight calls to Accept
func (l *listener) Close() error {
l.closeOnce.Do(func() {
close(l.down)
e := l.entry
n := e.network
n.mu.Lock()
defer n.mu.Unlock()
e.listener = nil
if e.empty() {
n.entryv[e.port] = nil
}
})
return nil
} }
// Listen starts new listener // Listen starts new listener
// It either allocates free port if laddr is "" or binds to laddr. // It either allocates free port if laddr is "", or binds to laddr.
// Once listener is started Dials could connect to listening address. // Once listener is started, Dials could connect to listening address.
// Connection requests created by Dials could be accepted via Accept. // Connection requests created by Dials could be accepted via Accept.
func (n *Network) Listen(laddr string) (net.Listener, error) { func (n *Network) Listen(laddr string) (net.Listener, error) {
lerr := func(err error) error { lerr := func(err error) error {
return &net.OpError{Op: "listen", Net: n.netname(), Addr: &Addr{n.netname(), laddr}, Err: err} return &net.OpError{Op: "listen", Net: n.Name(), Addr: &Addr{n.Name(), laddr}, Err: err}
} }
// laddr must be empty or int >= 0 // laddr must be empty or int >= 0
...@@ -204,13 +169,33 @@ func (n *Network) Listen(laddr string) (net.Listener, error) { ...@@ -204,13 +169,33 @@ func (n *Network) Listen(laddr string) (net.Listener, error) {
return l, nil return l, nil
} }
// Close closes the listener
// it interrupts all currently in-flight calls to Accept
func (l *listener) Close() error {
l.closeOnce.Do(func() {
close(l.down)
e := l.entry
n := e.network
n.mu.Lock()
defer n.mu.Unlock()
e.listener = nil
if e.empty() {
n.entryv[e.port] = nil
}
})
return nil
}
// Accept tries to connect to Dial called with addr corresponding to our listener // Accept tries to connect to Dial called with addr corresponding to our listener
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
n := l.entry.network n := l.entry.network
select { select {
case <-l.down: case <-l.down:
return nil, &net.OpError{Op: "accept", Net: n.netname(), Addr: l.Addr(), Err: errNetClosed} return nil, &net.OpError{Op: "accept", Net: n.Name(), Addr: l.Addr(), Err: errNetClosed}
case resp := <-l.dialq: case resp := <-l.dialq:
// someone dialed us - let's connect // someone dialed us - let's connect
...@@ -234,7 +219,7 @@ func (l *listener) Accept() (net.Conn, error) { ...@@ -234,7 +219,7 @@ func (l *listener) Accept() (net.Conn, error) {
// It tries to connect to Accept called on listener corresponding to addr. // It tries to connect to Accept called on listener corresponding to addr.
func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) { func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) {
derr := func(err error) error { derr := func(err error) error {
return &net.OpError{Op: "dial", Net: n.netname(), Addr: &Addr{n.netname(), addr}, Err: err} return &net.OpError{Op: "dial", Net: n.Name(), Addr: &Addr{n.Name(), addr}, Err: err}
} }
port, err := strconv.Atoi(addr) port, err := strconv.Atoi(addr)
...@@ -272,14 +257,8 @@ func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) { ...@@ -272,14 +257,8 @@ func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) {
} }
} }
// Addr returns address where listener is accepting incoming connections
func (l *listener) Addr() net.Addr {
// NOTE no +"l" suffix e.g. because Dial(l.Addr()) must work
return l.entry.addr()
}
// Close closes pipe endpoint and unregisters conn from Network // Close closes pipe endpoint and unregisters conn from Network
// All currently in-flight blocking IO is interuppted with an error // All currently in-flight blocked IO is interrupted with an error
func (c *conn) Close() (err error) { func (c *conn) Close() (err error) {
c.closeOnce.Do(func() { c.closeOnce.Do(func() {
err = c.Conn.Close() err = c.Conn.Close()
...@@ -318,71 +297,48 @@ func (c *conn) RemoteAddr() net.Addr { ...@@ -318,71 +297,48 @@ func (c *conn) RemoteAddr() net.Addr {
} }
// ---------------------------------------- // ----------------------------------------
/* // allocFreeEntry finds first free port and allocates network entry for it
var ( // must be called with .mu held
netMu sync.Mutex func (n *Network) allocFreeEntry() *entry {
networks = map[string]*Network{} // netSuffix -> Network // find first free port if it was not specified
port := 0
DefaultNet = New("") for ; port < len(n.entryv); port++ {
) if n.entryv[port] == nil {
break
// New creates, initializes and returns new pipenet Network
// network name is name of this network under "pipe" namesapce, e.g. ""
// network name must be unique - if not New will panic
func New(name string) *Network {
netMu.Lock()
defer netMu.Unlock()
_, already := networks[name]
if already {
panic(fmt.Errorf("pipenet %q already registered", name))
} }
n := &Network{Name: name}
networks[name] = n
return n
}
// lookupNet lookups Network by name
// name is full network name, e.g. "pipe"
func lookupNet(name string) (*Network, error) {
if !strings.HasPrefix(name, NetPrefix) {
return nil, errBadNetwork
} }
// if all busy it exits with port == len(n.entryv)
netMu.Lock() // grow if needed
defer netMu.Unlock() for port >= len(n.entryv) {
n.entryv = append(n.entryv, nil)
n := networks[strings.TrimPrefix(name, NetPrefix)]
if n == nil {
return nil, errNetNotFound
} }
return n, nil
e := &entry{network: n, port: port}
n.entryv[port] = e
return e
} }
// Dial dials addr on a pipenet // empty checks whether entry's both 2 pipe endpoints and listener are all nil
// network should be full network name, e.g. "pipe" func (e *entry) empty() bool {
func Dial(ctx context.Context, network, addr string) (net.Conn, error) { return e.pipev[0] == nil && e.pipev[1] == nil && e.listener == nil
n, err := lookupNet(network) }
if err != nil {
return nil, &net.OpError{Op: "dial", Net: network, Addr: &Addr{network, addr}, Err: err}
}
return n.Dial(ctx, addr) // addr returns address corresponding to entry
func (e *entry) addr() *Addr {
return &Addr{network: e.network.Name(), addr: fmt.Sprintf("%d", e.port)}
} }
// Listen starts listening on a pipenet address func (a *Addr) Network() string { return a.network }
// network should be full network name, e.g. "pipe" func (a *Addr) String() string { return a.addr }
func Listen(network, laddr string) (net.Listener, error) {
n, err := lookupNet(network)
if err != nil {
return nil, &net.OpError{Op: "listen", Net: network, Addr: &Addr{network, laddr}, Err: err}
}
return n.Listen(laddr) // Addr returns address where listener is accepting incoming connections
func (l *listener) Addr() net.Addr {
// NOTE no +"l" suffix e.g. because Dial(l.Addr()) must work
return l.entry.addr()
} }
*/
// Name returns full network name of this network
func (n *Network) Name() string { return NetPrefix + n.name }
...@@ -33,8 +33,12 @@ import ( ...@@ -33,8 +33,12 @@ import (
// we assume net.Pipe works ok; here we only test Listen/Accept/Dial routing // we assume net.Pipe works ok; here we only test Listen/Accept/Dial routing
// XXX tests are ugly, non-robust and small coverage // XXX tests are ugly, non-robust and small coverage
func xlisten(network, laddr string) net.Listener { type mklistener interface {
l, err := Listen(network, laddr) Listen(string) (net.Listener, error)
}
func xlisten(n mklistener, laddr string) net.Listener {
l, err := n.Listen(laddr)
exc.Raiseif(err) exc.Raiseif(err)
return l return l
} }
...@@ -45,8 +49,12 @@ func xaccept(l net.Listener) net.Conn { ...@@ -45,8 +49,12 @@ func xaccept(l net.Listener) net.Conn {
return c return c
} }
func xdial(network, addr string) net.Conn { type dialer interface {
c, err := Dial(context.Background(), network, addr) Dial(context.Context, string) (net.Conn, error)
}
func xdial(n dialer, addr string) net.Conn {
c, err := n.Dial(context.Background(), addr)
exc.Raiseif(err) exc.Raiseif(err)
return c return c
} }
...@@ -78,15 +86,12 @@ func assertEq(t *testing.T, a, b interface{}) { ...@@ -78,15 +86,12 @@ func assertEq(t *testing.T, a, b interface{}) {
func TestPipeNet(t *testing.T) { func TestPipeNet(t *testing.T) {
New("α") pnet := New("α")
_, err := Dial(context.Background(), "α", "0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "α", Addr: &Addr{"α", "0"}, Err: errBadNetwork})
_, err = Dial(context.Background(), "pipeα", "0") _, err := pnet.Dial(context.Background(), "0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipeα", Addr: &Addr{"pipeα", "0"}, Err: errConnRefused}) assertEq(t, err, &net.OpError{Op: "dial", Net: "pipeα", Addr: &Addr{"pipeα", "0"}, Err: errConnRefused})
l1 := xlisten("pipeα", "") l1 := xlisten(pnet, "")
assertEq(t, l1.Addr(), &Addr{"pipeα", "0"}) assertEq(t, l1.Addr(), &Addr{"pipeα", "0"})
// XXX -> use workGroup (in connection_test.go) // XXX -> use workGroup (in connection_test.go)
...@@ -109,14 +114,14 @@ func TestPipeNet(t *testing.T) { ...@@ -109,14 +114,14 @@ func TestPipeNet(t *testing.T) {
}) })
}) })
c1c := xdial("pipeα", "0") c1c := xdial(pnet, "0")
assertEq(t, c1c.LocalAddr(), &Addr{"pipeα", "1c"}) assertEq(t, c1c.LocalAddr(), &Addr{"pipeα", "1c"})
assertEq(t, c1c.RemoteAddr(), &Addr{"pipeα", "1s"}) assertEq(t, c1c.RemoteAddr(), &Addr{"pipeα", "1s"})
xwrite(c1c, "ping") xwrite(c1c, "ping")
assertEq(t, xread(c1c), "pong") assertEq(t, xread(c1c), "pong")
c2c := xdial("pipeα", "0") c2c := xdial(pnet, "0")
assertEq(t, c2c.LocalAddr(), &Addr{"pipeα", "2c"}) assertEq(t, c2c.LocalAddr(), &Addr{"pipeα", "2c"})
assertEq(t, c2c.RemoteAddr(), &Addr{"pipeα", "2s"}) assertEq(t, c2c.RemoteAddr(), &Addr{"pipeα", "2s"})
...@@ -125,6 +130,6 @@ func TestPipeNet(t *testing.T) { ...@@ -125,6 +130,6 @@ func TestPipeNet(t *testing.T) {
xwait(wg) xwait(wg)
l2 := xlisten("pipeα", "") l2 := xlisten(pnet, "")
assertEq(t, l2.Addr(), &Addr{"pipeα", "3"}) assertEq(t, l2.Addr(), &Addr{"pipeα", "3"})
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment