Commit c1e40bfe authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 04b2b074
...@@ -46,7 +46,7 @@ var ( ...@@ -46,7 +46,7 @@ var (
// Addr represents address of a pipenet endpoint // Addr represents address of a pipenet endpoint
type Addr struct { type Addr struct {
network string // full network name, e.g. "pipe" network string // full network name, e.g. "pipe"
addr string // XXX -> port ? + including c/s ? addr string // port + c/s depending on connection endpoint
} }
// Network implements synchronous in-memory network of pipes // Network implements synchronous in-memory network of pipes
...@@ -95,6 +95,27 @@ type listener struct { ...@@ -95,6 +95,27 @@ type listener struct {
} }
// allocFreeEntry finds first free port and allocate network entry for it
// must be called under .mu held
func (n *Network) allocFreeEntry() *entry {
// find first free port if it was not specified
port := 0
for ; port < len(n.entryv); port++ {
if n.entryv[port] == nil {
break
}
}
// if all busy it exits with port == len(n.entryv)
// grow if needed
for port >= len(n.entryv) {
n.entryv = append(n.entryv, nil)
}
e := &entry{network: n, port: port}
n.entryv[port] = e
return e
}
// empty checks whether both 2 pipe endpoints and listener are nil // empty checks whether both 2 pipe endpoints and listener are nil
func (e *entry) empty() bool { func (e *entry) empty() bool {
...@@ -111,7 +132,30 @@ func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr ...@@ -111,7 +132,30 @@ func (a *Addr) String() string { return a.addr } // XXX Network() + ":" + a.addr
func (n *Network) netname() string { return NetPrefix + n.Name } func (n *Network) netname() string { return NetPrefix + n.Name }
// Close closes the listener
// it interrupts all currently in-flight calls to Accept
func (l *listener) Close() error {
l.closeOnce.Do(func() {
close(l.down)
e := l.entry
n := e.network
n.mu.Lock()
defer n.mu.Unlock()
e.listener = nil
if e.empty() {
n.entryv[e.port] = nil
}
})
return nil
}
// Listen starts new listener
// Ct either allocates new port if laddr is "" or binds to laddr.
// Once listener is started Dials could connect to listener address.
// Connection requests created by Dials could be accepted via Accept.
func (n *Network) Listen(laddr string) (net.Listener, error) { func (n *Network) Listen(laddr string) (net.Listener, error) {
lerr := func(err error) error { lerr := func(err error) error {
return &net.OpError{Op: "listen", Net: n.netname(), Addr: &Addr{n.netname(), laddr}, Err: err} return &net.OpError{Op: "listen", Net: n.netname(), Addr: &Addr{n.netname(), laddr}, Err: err}
...@@ -129,16 +173,14 @@ func (n *Network) Listen(laddr string) (net.Listener, error) { ...@@ -129,16 +173,14 @@ func (n *Network) Listen(laddr string) (net.Listener, error) {
n.mu.Lock() n.mu.Lock()
defer n.mu.Unlock() defer n.mu.Unlock()
var e *entry
// find first free port if it was not specified // find first free port if it was not specified
if port < 0 { if port < 0 {
for port = 0; port < len(n.entryv); port++ { e = n.allocFreeEntry()
if n.entryv[port] == nil {
break
}
}
// if all busy it exits with port == len(n.entryv)
}
// else we check whether address is already used and if not allocate entry in-place
} else {
// grow if needed // grow if needed
for port >= len(n.entryv) { for port >= len(n.entryv) {
n.entryv = append(n.entryv, nil) n.entryv = append(n.entryv, nil)
...@@ -148,50 +190,41 @@ func (n *Network) Listen(laddr string) (net.Listener, error) { ...@@ -148,50 +190,41 @@ func (n *Network) Listen(laddr string) (net.Listener, error) {
return nil, lerr(errAddrAlreadyUsed) return nil, lerr(errAddrAlreadyUsed)
} }
e := &entry{network: n, port: port} e = &entry{network: n, port: port}
n.entryv[port] = e
}
// create listener under entry
l := &listener{ l := &listener{
entry: e, entry: e,
dialq: make(chan chan net.Conn), dialq: make(chan chan net.Conn),
down: make(chan struct{}), down: make(chan struct{}),
} }
e.listener = l e.listener = l
n.entryv[port] = e
return l, nil return l, nil
} }
// Close closes the listener
// it interrupts all currently in-flight calls to Accept
func (l *listener) Close() error {
l.closeOnce.Do(func() {
close(l.down)
e := l.entry
n := e.network
n.mu.Lock()
defer n.mu.Unlock()
e.listener = nil
if e.empty() {
n.entryv[e.port] = nil
}
})
return nil
}
// Accept tries to connect to Dial called with addr corresponding to our listener // Accept tries to connect to Dial called with addr corresponding to our listener
func (l *listener) Accept() (net.Conn, error) { func (l *listener) Accept() (net.Conn, error) {
n := l.entry.network
select { select {
case <-l.down: case <-l.down:
return nil, &net.OpError{Op: "accept", Net: l.entry.network.netname(), Addr: l.Addr(), Err: errNetClosed} return nil, &net.OpError{Op: "accept", Net: n.netname(), Addr: l.Addr(), Err: errNetClosed}
case resp := <-l.dialq: case resp := <-l.dialq:
// someone dialed us - let's connect // someone dialed us - let's connect
pc, ps := net.Pipe() pc, ps := net.Pipe()
// XXX allocate port and register to l.network.pipev // allocate entry and register conns to Network under it
n.mu.Lock()
e := n.allocFreeEntry()
e.pipev[0] = &conn{entry: e, endpoint: 0, Conn: pc}
e.pipev[1] = &conn{entry: e, endpoint: 1, Conn: ps}
n.mu.Unlock()
resp <- pc resp <- pc
return ps, nil return ps, nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment