Commit b370f152 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6a360785
...@@ -25,14 +25,19 @@ package pipenet ...@@ -25,14 +25,19 @@ package pipenet
import ( import (
"errors" "errors"
"fmt"
"net" "net"
"strconv"
"strings"
"sync"
) )
const NetPrefix = "pipe" // pipenet package works only with "pipe*" networks const NetPrefix = "pipe" // pipenet package works only with "pipe*" networks
var errBadNetwork = errors.New("pipenet: invalid network")
var errBadAddress = errors.New("pipenet: invalid address")
var errNetNotFound = errors.New("no such network") var errNetNotFound = errors.New("no such network")
//var errBadNetwork = errors.New("invalid network") var errNetClosed = errors.New("network connection closed")
var errBadAddress = errors.New("invalid address")
var errAddrAlreadyUsed = errors.New("address already in use") var errAddrAlreadyUsed = errors.New("address already in use")
var errConnRefused = errors.New("connection refused") var errConnRefused = errors.New("connection refused")
...@@ -47,25 +52,25 @@ type Network struct { ...@@ -47,25 +52,25 @@ type Network struct {
Name string Name string
mu sync.Mutex mu sync.Mutex
pipev []pipe // port -> listener + net.Pipe (?) pipev []*pipe // port -> listener + net.Pipe (?)
// listenv []chan dialReq // listener[port] is waiting here if != nil // listenv []chan dialReq // listener[port] is waiting here if != nil
} }
// pipe represents one pipenet connection // pipe represents one pipenet connection XXX naming
// it can be either already connected (2 endpoints) or only listening (1 endpoint) XXX // it can be either already connected (2 endpoints) or only listening (1 endpoint) XXX
type pipe struct { type pipe struct {
// TODO listener *listener // listener is waiting here if != nil
} }
// Addr represents address of a pipe endpoint // Addr represents address of a pipe endpoint
type Addr struct { type Addr struct {
network *Network network string // full network name, e.g. "pipe"
addr string // XXX -> port ? + including c/s ? addr string // XXX -> port ? + including c/s ?
} }
var _ net.Addr = (*Addr)(nil) var _ net.Addr = (*Addr)(nil)
func (a *Addr) Network() string { return a.network.netname() } func (a *Addr) Network() string { return a.network }
func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr ? func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr ?
func (n *Network) netname() string { return NetPrefix + n.Name } func (n *Network) netname() string { return NetPrefix + n.Name }
...@@ -75,7 +80,7 @@ func (n *Network) netname() string { return NetPrefix + n.Name } ...@@ -75,7 +80,7 @@ func (n *Network) netname() string { return NetPrefix + n.Name }
func (n *Network) Listen(laddr string) (net.Listener, error) { func (n *Network) Listen(laddr string) (net.Listener, error) {
lerr := func(err error) error { lerr := func(err error) error {
return &net.OpError{Op: "listen", Net: n.netname(), Addr: laddr, Err: err} return &net.OpError{Op: "listen", Net: n.netname(), Addr: &Addr{n.netname(), laddr}, Err: err}
} }
// laddr must be empty or int >= 0 // laddr must be empty or int >= 0
...@@ -83,59 +88,66 @@ func (n *Network) Listen(laddr string) (net.Listener, error) { ...@@ -83,59 +88,66 @@ func (n *Network) Listen(laddr string) (net.Listener, error) {
if laddr != "" { if laddr != "" {
port, err := strconv.Atoi(laddr) port, err := strconv.Atoi(laddr)
if err != nil || port < 0 { if err != nil || port < 0 {
return lerr(errBadAddress) return nil, lerr(errBadAddress)
} }
} }
n.mu.Lock() n.mu.Lock()
defer n.m.Unlock() defer n.mu.Unlock()
// find first free port if it was not specified // find first free port if it was not specified
if port < 0 { if port < 0 {
for port = 0; port < len(n.listenv); port++ { for port = 0; port < len(n.pipev); port++ {
if n.listenv[port] == nil { if n.pipev[port] == nil {
break break
} }
} }
// if all busy it exits with port == len(n.listenv) // if all busy it exits with port == len(n.pipev)
} }
// grow if needed // grow if needed
for port >= len(n.listenv) { for port >= len(n.pipev) {
n.listenv = append(n.listenv, nil) n.pipev = append(n.pipev, nil)
} }
if n.listenv[port] != nil { if n.pipev[port] != nil {
return lerr(errAddrAlreadyUsed) return nil, lerr(errAddrAlreadyUsed)
} }
l := listener{...} l := &listener{
n.listenv[port] = l... network: n,
port: port,
dialq: make(chan chan connected),
acceptq: make(chan chan connected),
down: make(chan struct{}),
}
n.pipev[port] = &pipe{listener: l}
return &l return l, nil
} }
// listener implements net.Listener for piped network // listener implements net.Listener for piped network
type listener struct { type listener struct {
network *Network // XXX needed ? network *Network // XXX needed ?
port int // port we are listening on port int // port we are listening on XXX needed ?
dialq chan chan dialResp // Dial requests to our port go here dialq chan chan connected // Dial requests to our port go here
acceptq chan chan acceptResp // Accept requests go here acceptq chan chan connected // Accept requests go here
down chan struct{} // Close -> down=ready down chan struct{} // Close -> down=ready
downOnce sync.Once // so Close several times is ok downOnce sync.Once // so Close several times is ok
} }
var _ net.Listener = (*listener)nil var _ net.Listener = (*listener)(nil)
func (l *listener) Addr() Addr { func (l *listener) Addr() net.Addr {
return &Addr{net: l.net, addr: fmt.Sprintf("%d", l.port} // NOTE no c/s XXX -> +l ? return &Addr{network: l.network.netname(), addr: fmt.Sprintf("%d", l.port)} // NOTE no c/s XXX -> +l ?
} }
func (l *listener) Close() error { func (l *listener) Close() error {
l.downOnce.Do(func() { l.downOnce.Do(func() {
close(l.down) close(l.down)
}) })
return nil
} }
// connected is response from listener to Dial and Accept // connected is response from listener to Dial and Accept
...@@ -147,8 +159,8 @@ type connected struct { ...@@ -147,8 +159,8 @@ type connected struct {
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
ch := make(chan connected) ch := make(chan connected)
select { select {
case l.down case <-l.down:
return ... "... closed" // XXX return nil, &net.OpError{Op: "accept", Net: l.network.netname(), Addr: l.Addr(), Err: errNetClosed}
case l.acceptq <- ch: case l.acceptq <- ch:
resp := <-ch resp := <-ch
...@@ -156,33 +168,34 @@ func (l *listener) Accept() (net.Conn, error) { ...@@ -156,33 +168,34 @@ func (l *listener) Accept() (net.Conn, error) {
} }
} }
func (n *Network) Dial(network, addr string) (net.Conn, error) { func (n *Network) Dial(addr string) (net.Conn, error) {
derr := func(err error) error { derr := func(err error) error {
return &net.OpError{Op: "dial", Net: network, Addr: addr, Err: err} return &net.OpError{Op: "dial", Net: n.netname(), Addr: &Addr{n.netname(), addr}, Err: err}
} }
port, err := strconv.Atoi(addr) port, err := strconv.Atoi(addr)
if err != nil || port < 0 { if err != nil || port < 0 {
return derr(errBadAddress) return nil, derr(errBadAddress)
} }
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() // XXX ok to defer here? defer n.mu.Unlock() // XXX ok to defer here?
if port >= len(n.listenv) { if port >= len(n.pipev) {
return derr(errConnRefused) // XXX merge with vvv return nil, derr(errConnRefused) // XXX merge with vvv
} }
l := n.listenv[port] p := n.pipev[port]
if l == nil { if p == nil || p.listener == nil {
return derr(errConnRefused) // XXX merge with ^^^ return nil, derr(errConnRefused) // XXX merge with ^^^
} }
l := p.listener
// NOTE listener is not locking n.mu -> it is ok to send/receive under mu // NOTE listener is not locking n.mu -> it is ok to send/receive under mu
ch := make(chan dialResp) ch := make(chan connected)
select { select {
case l.down: case <-l.down:
return derr(errConnRefused) return nil, derr(errConnRefused)
case l.dialq <- ch: case l.dialq <- ch:
resp := <-ch resp := <-ch
...@@ -210,7 +223,7 @@ func New(name string) *Network { ...@@ -210,7 +223,7 @@ func New(name string) *Network {
_, already := networks[name] _, already := networks[name]
if already { if already {
panic(fmt.Errorf("pipenet %v already registered", name) panic(fmt.Errorf("pipenet %v already registered", name))
} }
n := &Network{Name: name} n := &Network{Name: name}
...@@ -219,31 +232,39 @@ func New(name string) *Network { ...@@ -219,31 +232,39 @@ func New(name string) *Network {
} }
// lookupNet lookups Network by name // lookupNet lookups Network by name
// returns nil, if not found func lookupNet(name string) (*Network, error) {
func lookupNet(name string) *Network { if !strings.HasPrefix(NetPrefix, name) {
return nil, errBadNetwork
}
netMu.Lock() netMu.Lock()
defer netMu.Unlock() defer netMu.Unlock()
return networks[name] n := networks[strings.TrimPrefix(NetPrefix, name)]
if n == nil {
return nil, errNetNotFound
}
return n, nil
} }
// Dial dials addr on a pipenet // Dial dials addr on a pipenet
// network should be valid registered pipe network name // network should be full valid registered pipe network name, e.g. "pipe"
func Dial(network, addr string) (net.Conn, error) { func Dial(network, addr string) (net.Conn, error) {
n := lookupNet(network) n, err := lookupNet(network)
if n == nil { if err != nil {
return nil, &net.OpError{Op: "dial", Net: NetPrefix + network, Addr: addr, Err: err} return nil, &net.OpError{Op: "dial", Net: network, Addr: &Addr{network, addr}, Err: err}
} }
return n.Dial(addr) return n.Dial(addr)
} }
// XXX text // Listen starts listening on a pipenet address
func Listen(network, laddr string) (net.Listen, error) { // network should be full valid registered pipe network name, e.g. "pipe"
n := lookupNet(network) func Listen(network, laddr string) (net.Listener, error) {
if n == nil { n, err := lookupNet(network)
return nil, &net.OpError{Op: "listen", Net: NetPrefix + network, Addr: addr, Err: err} if err != nil {
return nil, &net.OpError{Op: "listen", Net: network, Addr: &Addr{network, laddr}, Err: err}
} }
return n.Listen(netw, laddr) return n.Listen(laddr)
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment