Commit df5aca5b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9e1c3858
...@@ -823,6 +823,33 @@ func (l *linkListener) Addr() net.Addr { ...@@ -823,6 +823,33 @@ func (l *linkListener) Addr() net.Addr {
return l.l.Addr() return l.l.Addr()
} }
/*
XXX do if this is needed in a second place besides talkMaster1
// ---- Listen for single Conn over NodeLink use-cases ----
// XXX
func ListenSingleConn(link *NodeLink) ConnListener {
l := &listen1conn{link}
// XXX go ...
return l
}
// ConnListener XXX ...
type ConnListener interface {
// XXX +Close, Addr ?
// Accept returns new connection multiplexed over NodeLink
Accept() (*Conn, error)
}
type listen1conn struct {
link *NodeLink
}
func ...
*/
// ---- for convenience: Conn -> NodeLink & local/remote link addresses ---- // ---- for convenience: Conn -> NodeLink & local/remote link addresses ----
// LocalAddr returns local address of the underlying link to peer. // LocalAddr returns local address of the underlying link to peer.
......
...@@ -202,18 +202,21 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -202,18 +202,21 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// accept next connection from master. only 1 connection is served at any given time. // accept next connection from master. only 1 connection is served at any given time.
// every new connection from master means talk over previous connection is cancelled. // every new connection from master means talk over previous connection is cancelled.
// XXX recheck compatibility with py // XXX recheck compatibility with py
acceptq := make(chan *neo.Conn) type accepted struct {conn *neo.Conn; err error}
acceptq := make(chan accepted)
go func () { go func () {
for { for {
conn, err := Mlink.Accept() conn, err := Mlink.Accept()
if err != nil {
log.Error(ctx, err)
return
}
select { select {
case acceptq <- conn: case acceptq <- accepted{conn, err}:
case <-retch: case <-retch:
return
}
if err != nil {
log.Error(ctx, err)
return
} }
} }
}() }()
...@@ -222,9 +225,15 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -222,9 +225,15 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
talkq := make(chan error, 1) talkq := make(chan error, 1)
for { for {
// wait for next connection from master if talk over previous one finished. // wait for next connection from master if talk over previous one finished.
// XXX rafactor all this into SingleTalker ? (XXX ServeSingle ?)
if Mconn == nil { if Mconn == nil {
select { select {
case Mconn = <-acceptq: case a := <-acceptq:
if a.err != nil {
return a.err
}
Mconn = a.conn
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
} }
...@@ -251,10 +260,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) { ...@@ -251,10 +260,13 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// next connection / talk finished / cancel // next connection / talk finished / cancel
select { select {
case conn := <-acceptq: case a := <-acceptq:
lclose(ctx, Mconn) // wakeup/cancel current talk lclose(ctx, Mconn) // wakeup/cancel current talk
<-talkq // wait till it finish <-talkq // wait till it finish
Mconn = conn // proceed next cycle on accepted conn if a.err != nil {
return a.err
}
Mconn = a.conn // proceed next cycle on accepted conn
case err = <-talkq: case err = <-talkq:
// XXX check for shutdown command // XXX check for shutdown command
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment