Commit b1e8b102 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 0fa03444
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
// //
// See COPYING file for full licensing terms. // See COPYING file for full licensing terms.
// Package pipenet provides in-memory network of net.Pipes // Package pipenet provides synchronous in-memory network of net.Pipes
// //
// TODO describe addressing scheme // TODO describe addressing scheme
// //
...@@ -36,38 +36,69 @@ const NetPrefix = "pipe" // pipenet package works only with "pipe*" networks ...@@ -36,38 +36,69 @@ const NetPrefix = "pipe" // pipenet package works only with "pipe*" networks
var ( var (
errBadNetwork = errors.New("pipenet: invalid network") errBadNetwork = errors.New("pipenet: invalid network")
errBadAddress = errors.New("pipenet: invalid address") errBadAddress = errors.New("invalid address")
errNetNotFound = errors.New("no such network") errNetNotFound = errors.New("no such network")
errNetClosed = errors.New("network connection closed") errNetClosed = errors.New("network connection closed")
errAddrAlreadyUsed = errors.New("address already in use") errAddrAlreadyUsed = errors.New("address already in use")
errConnRefused = errors.New("connection refused") errConnRefused = errors.New("connection refused")
) )
// Addr represents address of a pipenet endpoint
type Addr struct {
network string // full network name, e.g. "pipe"
addr string // XXX -> port ? + including c/s ?
}
// Network represents network of in-memory pipes // Network implements synchronous in-memory network of pipes
// It can be worked with the same way a regular TCP network is handled with Dial/Listen/Accept/... // It can be worked with the same way a regular TCP network is handled with Dial/Listen/Accept/...
// //
// Network must be created with New // Network must be created with New
type Network struct { type Network struct {
// name of this network under "pipe" namespace -> e.g. "" // name of this network under "pipe" namespace -> e.g. ""
// full network name will be reported as "pipe"+Name // full network name will be reported as "pipe"+Name XXX -> just full name ?
Name string Name string
mu sync.Mutex mu sync.Mutex
pipev []*pipe // port -> listener + net.Pipe (?) entryv []*entry // port -> listener | (conn, conn)
// listenv []chan dialReq // listener[port] is waiting here if != nil
} }
// pipe represents one pipenet connection XXX naming // entry represents one Network entry
// it can be either already connected (2 endpoints) or only listening (1 endpoint) XXX // it can be either already connected (2 endpoints) or only listening (1 endpoint)
type pipe struct { // anything from the above becomes nil when closed
listener *listener // listener is waiting here if != nil type entry struct {
network *Network
port int
pipev [2]*conn // connection endpoints are there if != nil
listener *listener // listener is waiting here if != nil
} }
// Addr represents address of a pipenet endpoint // conn represents one endpoint of connection created under Network
type Addr struct { type conn struct {
network string // full network name, e.g. "pipe" entry *entry
addr string // XXX -> port ? + including c/s ? endpoint int // 0 | 1 -> entry.pipev
net.Conn
closeOnce sync.Once
}
// listener implements net.Listener for piped network
type listener struct {
// network/port we are listening on
entry *entry
dialq chan chan net.Conn // Dial requests to our port go here
down chan struct{} // Close -> down=ready
closeOnce sync.Once
}
// empty checks whether both 2 pipe endpoints and listener are nil
func (e *entry) empty() bool {
return e.pipev[0] == nil && e.pipev[1] == nil && e.listener == nil
} }
func (a *Addr) Network() string { return a.network } func (a *Addr) Network() string { return a.network }
...@@ -75,8 +106,6 @@ func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr ...@@ -75,8 +106,6 @@ func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr
func (n *Network) netname() string { return NetPrefix + n.Name } func (n *Network) netname() string { return NetPrefix + n.Name }
// XXX do we need Conn wrapping net.Pipe ? (e.g. to override String())
func (n *Network) Listen(laddr string) (net.Listener, error) { func (n *Network) Listen(laddr string) (net.Listener, error) {
lerr := func(err error) error { lerr := func(err error) error {
...@@ -97,50 +126,51 @@ func (n *Network) Listen(laddr string) (net.Listener, error) { ...@@ -97,50 +126,51 @@ func (n *Network) Listen(laddr string) (net.Listener, error) {
// find first free port if it was not specified // find first free port if it was not specified
if port < 0 { if port < 0 {
for port = 0; port < len(n.pipev); port++ { for port = 0; port < len(n.entryv); port++ {
if n.pipev[port] == nil { if n.entryv[port] == nil {
break break
} }
} }
// if all busy it exits with port == len(n.pipev) // if all busy it exits with port == len(n.entryv)
} }
// grow if needed // grow if needed
for port >= len(n.pipev) { for port >= len(n.entryv) {
n.pipev = append(n.pipev, nil) n.entryv = append(n.entryv, nil)
} }
if n.pipev[port] != nil { if n.entryv[port] != nil {
return nil, lerr(errAddrAlreadyUsed) return nil, lerr(errAddrAlreadyUsed)
} }
e := &entry{network: n, port: port}
l := &listener{ l := &listener{
network: n, entry: e,
port: port,
dialq: make(chan chan net.Conn), dialq: make(chan chan net.Conn),
down: make(chan struct{}), down: make(chan struct{}),
} }
n.pipev[port] = &pipe{listener: l} e.listener = l
n.entryv[port] = e
return l, nil return l, nil
} }
// listener implements net.Listener for piped network
type listener struct {
// network/port we are listening on
network *Network
port int
dialq chan chan net.Conn // Dial requests to our port go here
down chan struct{} // Close -> down=ready
downOnce sync.Once // so Close several times is ok
}
// Close closes the listener // Close closes the listener
// it interrupts all currently in-flight calls to Accept // it interrupts all currently in-flight calls to Accept
func (l *listener) Close() error { func (l *listener) Close() error {
l.downOnce.Do(func() { l.closeOnce.Do(func() {
close(l.down) close(l.down)
e := l.entry
n := e.network
n.mu.Lock()
defer n.mu.Unlock()
e.listener = nil
if e.empty() {
n.entryv[e.port] = nil
}
}) })
return nil return nil
} }
...@@ -149,7 +179,7 @@ func (l *listener) Close() error { ...@@ -149,7 +179,7 @@ func (l *listener) Close() error {
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
select { select {
case <-l.down: case <-l.down:
return nil, &net.OpError{Op: "accept", Net: l.network.netname(), Addr: l.Addr(), Err: errNetClosed} return nil, &net.OpError{Op: "accept", Net: l.entry.network.netname(), Addr: l.Addr(), Err: errNetClosed}
case resp := <-l.dialq: case resp := <-l.dialq:
// someone dialed us - let's connect // someone dialed us - let's connect
...@@ -177,15 +207,15 @@ func (n *Network) Dial(addr string) (net.Conn, error) { ...@@ -177,15 +207,15 @@ func (n *Network) Dial(addr string) (net.Conn, error) {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() // XXX ok to defer here? defer n.mu.Unlock() // XXX ok to defer here?
if port >= len(n.pipev) { if port >= len(n.entryv) {
return nil, derr(errConnRefused) // XXX merge with vvv return nil, derr(errConnRefused) // XXX merge with vvv
} }
p := n.pipev[port] e := n.entryv[port]
if p == nil || p.listener == nil { if e == nil || e.listener == nil {
return nil, derr(errConnRefused) // XXX merge with ^^^ return nil, derr(errConnRefused) // XXX merge with ^^^
} }
l := p.listener l := e.listener
// NOTE listener is not locking n.mu -> it is ok to send/receive under mu - FIXME not correct // NOTE listener is not locking n.mu -> it is ok to send/receive under mu - FIXME not correct
// FIXME -> Accept needs to register new connection under n.mu // FIXME -> Accept needs to register new connection under n.mu
...@@ -201,22 +231,37 @@ func (n *Network) Dial(addr string) (net.Conn, error) { ...@@ -201,22 +231,37 @@ func (n *Network) Dial(addr string) (net.Conn, error) {
// Addr returns address where listener is accepting incoming connections // Addr returns address where listener is accepting incoming connections
func (l *listener) Addr() net.Addr { func (l *listener) Addr() net.Addr {
return &Addr{network: l.network.netname(), addr: fmt.Sprintf("%d", l.port)} // NOTE no c/s XXX -> +l ? e := l.entry
n := e.network
return &Addr{network: n.netname(), addr: fmt.Sprintf("%d", e.port)} // NOTE no c/s XXX -> +l ?
} }
// XXX conn.Close - unregister from network.connv
// XXX conn.LocalAddr -> ...
// XXX conn.RemoteAddr -> ...
func (c *conn) Close() (err error) {
c.closeOnce.Do(func() {
err = c.Conn.Close()
e := c.entry
n := e.network
// conn represents one endpoint of connection created under Network n.mu.Lock()
type conn struct { defer n.mu.Unlock()
network *Network
// XXX port + c/s ?
net.Conn e.pipev[c.endpoint] = nil
if e.empty() {
n.entryv[e.port] = nil
}
})
return err
} }
// XXX conn.Close - unregister from network.connv
// XXX conn.LocalAddr -> ...
// XXX conn.RemoteAddr -> ...
// ---------------------------------------- // ----------------------------------------
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment