Commit d41f328d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 48e38f91
...@@ -150,9 +150,9 @@ func (m *Master) Run(ctx context.Context) error { ...@@ -150,9 +150,9 @@ func (m *Master) Run(ctx context.Context) error {
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
m.logf("serving on %s ...", l.Addr())
m.node.MasterAddr = l.Addr().String() m.node.MasterAddr = l.Addr().String()
m.logf("serving on %s ...", m.node.MasterAddr)
// accept incoming connections and pass them to main driver // accept incoming connections and pass them to main driver
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
...@@ -231,7 +231,7 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -231,7 +231,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// XXX shutdown ? // XXX shutdown ?
} }
return fmt.Errorf("master: run: %v\n", ctx.Err()) // XXX run -> runmain ? return errors.WithMessage(ctx.Err(), "master: run")
} }
...@@ -259,7 +259,8 @@ type storRecovery struct { ...@@ -259,7 +259,8 @@ type storRecovery struct {
// - !nil: recovery was cancelled // - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) (err error) { func (m *Master) recovery(ctx context.Context) (err error) {
m.log("recovery") m.log("recovery")
defer xerr.Context(&err, "master: recovery") //defer xerr.Context(&err, "master: recovery")
defer m.errctx(&err, "recovery")
m.setClusterState(neo.ClusterRecovering) m.setClusterState(neo.ClusterRecovering)
rctx, rcancel := context.WithCancel(ctx) rctx, rcancel := context.WithCancel(ctx)
...@@ -299,17 +300,17 @@ loop: ...@@ -299,17 +300,17 @@ loop:
err := m.accept(n.conn, resp) err := m.accept(n.conn, resp)
if err != nil { if err != nil {
// XXX better m.nodeLeave <- nodeLeave{node, err} ?
// XXX move this m.nodeLeave <- to accept() ? // XXX move this m.nodeLeave <- to accept() ?
recovery <- storRecovery{node: node, err: err} recovery <- storRecovery{node: node, err: err}
return
} }
// start recovery // start recovery
storCtlRecovery(rctx, node, recovery) storCtlRecovery(rctx, node, recovery)
}() }()
// XXX calrify who sends here
/* /*
// XXX calrify who sends here
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
// TODO close n.node.Link // TODO close n.node.Link
// //
...@@ -324,16 +325,17 @@ loop: ...@@ -324,16 +325,17 @@ loop:
if r.err != nil { if r.err != nil {
m.logf("%v", r.err) m.logf("%v", r.err)
// XXX only if not cancelled if !xcontext.Canceled(errors.Cause(r.err)) {
m.logf("master: %v: closing link", r.node.Link) m.logf("%v: closing link", r.node.Link)
// close stor link / update .nodeTab // close stor link / update .nodeTab
err := r.node.Link.Close() err := r.node.Link.Close()
if err != nil { if err != nil {
m.logf("master: %v\n", r.node.Link) m.logf("master: %v\n", r.node.Link)
} }
m.nodeTab.SetNodeState(r.node, DOWN) m.nodeTab.SetNodeState(r.node, DOWN)
}
} else { } else {
// we are interested in latest partTab // we are interested in latest partTab
...@@ -393,10 +395,7 @@ loop: ...@@ -393,10 +395,7 @@ loop:
// we do not care errors here - they are either cancelled or IO errors // we do not care errors here - they are either cancelled or IO errors
// we just log them and return - in case it is IO error // we just log them and return - in case it is IO error
// on link it will be caught on next send/recv XXX // on link it will be caught on next send/recv XXX
switch errors.Cause(r.err) { if !xcontext.Canceled(errors.Cause(r.err)) {
case context.Canceled, context.DeadlineExceeded:
// ok
default:
// XXX not so ok // XXX not so ok
} }
...@@ -1267,7 +1266,7 @@ func (m *Master) ServeMaster(ctx context.Context, conn *neo.Conn) { ...@@ -1267,7 +1266,7 @@ func (m *Master) ServeMaster(ctx context.Context, conn *neo.Conn) {
func (m *Master) logf(format string, argv ...interface{}) { func (m *Master) logf(format string, argv ...interface{}) {
// TODO support custom log.Logger // TODO support custom log.Logger
log.Output(2, fmt.Sprintf("master @%v: " + format, append([]string{m.MasterAddr}, argv...)...)) log.Output(2, fmt.Sprintf("master(%v): " + format, append([]string{m.MasterAddr}, argv...)...))
} }
func (m *Master) vlogf(format string, argv ...interface{}) { func (m *Master) vlogf(format string, argv ...interface{}) {
...@@ -1277,3 +1276,12 @@ func (m *Master) vlogf(format string, argv ...interface{}) { ...@@ -1277,3 +1276,12 @@ func (m *Master) vlogf(format string, argv ...interface{}) {
} }
m.log(format, argv) m.log(format, argv)
} }
func (m *Master) errctx(errp *error, context string) {
if *errp == nil {
return
}
xerr.Contextf(errp, "master(%v): %s", m.node.MasterAddr, context)
}
...@@ -92,6 +92,7 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -92,6 +92,7 @@ func (stor *Storage) Run(ctx context.Context) error {
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
stor.logf("serving on %s ...", l.Addr())
// start serving incoming connections // start serving incoming connections
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
......
...@@ -120,3 +120,18 @@ func (mc *mergeCtx) Value(key interface{}) interface{} { ...@@ -120,3 +120,18 @@ func (mc *mergeCtx) Value(key interface{}) interface{} {
} }
return mc.ctx2.Value(key) return mc.ctx2.Value(key)
} }
// Cancelled reports whether an error is due to a canceled context.
//
// Since both cancellation - explicit and due to exceeding context deadline -
// result in the same signal to work being done under context, Canceled treats
// both context.Canceled and context.DeadlineExceeded as errors indicating
// context cancellation.
func Canceled(err error) bool {
switch err {
case context.Canceled, context.DeadlineExceeded:
return true
}
return false
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment