Commit 7ecadf55 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a41e0b4d
...@@ -332,13 +332,6 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) { ...@@ -332,13 +332,6 @@ func (stor *Storage) m1serve(ctx context.Context, Mconn *neo.Conn) (err error) {
} }
} }
// XXX naming -> withOpCtx? withUntilOperational?
func (stor *Storage) opCtxRead() context.Context {
stor.opMu.Lock()
defer stor.opMu.Unlock()
return stor.opCtx
}
// ServeLink serves incoming node-node link connection // ServeLink serves incoming node-node link connection
// XXX +error return? // XXX +error return?
func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
...@@ -395,6 +388,16 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) { ...@@ -395,6 +388,16 @@ func (stor *Storage) ServeLink(ctx context.Context, link *neo.NodeLink) {
// TODO wait all spawned serveConn // TODO wait all spawned serveConn
} }
// withWhileOperational derives new context from ctx which will be cancelled, when either
// - ctx is cancelled, or
// - master tells us to stop operational service
func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context, context.CancelFunc) {
stor.opMu.Lock()
opCtx := stor.opCtx
stor.opMu.Unlock()
return xcontext.Merge(ctx, opCtx)
}
// serveClient serves incoming connection on which peer identified itself as client // serveClient serves incoming connection on which peer identified itself as client
// XXX +error return? // XXX +error return?
...@@ -402,31 +405,43 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) { ...@@ -402,31 +405,43 @@ func (stor *Storage) serveClient(ctx context.Context, conn *neo.Conn) {
fmt.Printf("stor: %s: serving new client conn\n", conn) fmt.Printf("stor: %s: serving new client conn\n", conn)
// rederive ctx to be also cancelled if M tells us StopOperation // rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := xcontext.Merge(ctx, stor.opCtxRead()) ctx, cancel := stor.withWhileOperational(ctx)
//ctx, cancel := stor.withWhileOperational(ctx)
defer cancel() defer cancel()
// main work to serve
done := make(chan error, 1)
go func() {
for {
err := stor.serveClient1(conn)
if err != nil {
done <- err
break
}
}
}()
// close connection when either cancelling or returning (e.g. due to an error) // close connection when either cancelling or returning (e.g. due to an error)
// ( when cancelling - conn.Close will signal to current IO to // ( when cancelling - conn.Close will signal to current IO to
// terminate with an error ) // terminate with an error )
go func() { var err error
<-ctx.Done() select {
// XXX tell client if we are shutting down? case <-ctx.Done():
// XXX ret err = cancelled ? // XXX tell client we are shutting down?
fmt.Printf("stor: %v: closing client conn\n", conn) err = ctx.Err()
conn.Close() // XXX err
}()
for { case err = <-done:
serveClient1(conn) // XXX err check
} }
fmt.Printf("stor: %v: %v\n", conn, err)
fmt.Printf("stor: %v: closing client conn\n", conn)
conn.Close() // XXX err
} }
// serveClient1 serves 1 request from a client // serveClient1 serves 1 request from a client
func (stor *Storage) serveClient1(conn *neo.Conn) error { func (stor *Storage) serveClient1(conn *neo.Conn) error {
req, err := conn.Recv() req, err := conn.Recv()
if err != nil { if err != nil {
return // XXX log / err / send error before closing return err // XXX log / err / send error before closing
} }
switch req := req.(type) { switch req := req.(type) {
...@@ -481,4 +496,6 @@ func (stor *Storage) serveClient1(conn *neo.Conn) error { ...@@ -481,4 +496,6 @@ func (stor *Storage) serveClient1(conn *neo.Conn) error {
} }
//req.Put(...) //req.Put(...)
return nil
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment