Commit 1b4ec3ff authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9f4acd8d
......@@ -153,7 +153,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
return err // XXX err ctx
}
defer running(&ctx, "master(%v)", l.Addr())(&err)
defer runningf(&ctx, "master(%v)", l.Addr())(&err)
log.Info(ctx, "serving ...")
m.node.MasterAddr = l.Addr().String()
......@@ -179,7 +179,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
case <-serveCtx.Done():
// shutdown
conn.Link().Close() // XXX log err ?
lclose(serveCtx, conn.Link())
return
}
}
......@@ -189,7 +189,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
err = m.runMain(ctx)
serveCancel()
l.Close() // XXX log err ?
lclose(ctx, l)
wg.Wait()
return err
......@@ -299,10 +299,10 @@ loop:
return
}
err := m.accept(n.conn, resp)
err := m.accept(ctx, n.conn, resp)
if err != nil {
// XXX move this m.nodeLeave <- to accept() ?
recovery <- storRecovery{node: node, err: err}
recovery <- storRecovery{stor: node, err: err}
return
}
......@@ -327,15 +327,12 @@ loop:
log.Error(ctx, r.err)
if !xcontext.Canceled(errors.Cause(r.err)) {
log.Infof(ctx, "%v: closing link", r.node.Link)
log.Infof(ctx, "%v: closing link", r.stor.Link)
// close stor link / update .nodeTab
err := r.node.Link.Close()
if err != nil {
log.Error(ctx, err)
}
lclose(ctx, r.stor.Link)
m.nodeTab.SetNodeState(r.node, neo.DOWN)
m.nodeTab.SetNodeState(r.stor, neo.DOWN)
}
} else {
......@@ -418,7 +415,7 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
}
// on error provide feedback to storRecovery chan
res <- storRecovery{node: stor, err: err}
res <- storRecovery{stor: stor, err: err}
}()
defer runningf(&ctx, "%s: stor recovery", stor.Link)(&err)
......@@ -463,7 +460,7 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
}
}
res <- storRecovery{node: stor, partTab: pt}
res <- storRecovery{stor: stor, partTab: pt}
}
......@@ -923,7 +920,7 @@ func (m *Master) reject(ctx context.Context, conn *neo.Conn, resp neo.Msg) {
// accept sends acceptive identification response and closes conn
// XXX if problem -> .nodeLeave
// XXX spawn ping goroutine from here?
func (m *Master) accept(conn *neo.Conn, resp neo.Msg) error {
func (m *Master) accept(ctx context.Context, conn *neo.Conn, resp neo.Msg) error {
// XXX cancel on ctx
err1 := conn.Send(resp)
err2 := conn.Close()
......
......@@ -118,7 +118,7 @@ func (stor *Storage) Run(ctx context.Context) error {
case <-serveCtx.Done():
// shutdown
conn.Link().Close() // XXX log err ?
lclose(serveCtx, conn.Link())
return
}
}
......
......@@ -23,6 +23,7 @@ package server
import (
"context"
"fmt"
"io"
"lab.nexedi.com/kirr/neo/go/xcommon/task"
"lab.nexedi.com/kirr/neo/go/xcommon/log"
......@@ -67,3 +68,13 @@ func _running(ctxp *context.Context, name string) func(*error) {
task.ErrContext(errp, ctx)
}
}
// lclose closes c and logs closing error if there was any.
// the error is otherwise ignored
func lclose(ctx context.Context, c io.Closer) {
err := c.Close()
if err != nil {
log.Error(ctx, err)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment