Commit 9f4acd8d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d74c9d41
...@@ -146,13 +146,15 @@ func (m *Master) setClusterState(state neo.ClusterState) { ...@@ -146,13 +146,15 @@ func (m *Master) setClusterState(state neo.ClusterState) {
// Run starts master node and runs it until ctx is cancelled or fatal error // Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) error { func (m *Master) Run(ctx context.Context) (err error) {
// start listening // start listening
l, err := m.node.Listen() l, err := m.node.Listen()
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
log.Infof(ctx, "serving on %s ...", l.Addr())
defer running(&ctx, "master(%v)", l.Addr())(&err)
log.Info(ctx, "serving ...")
m.node.MasterAddr = l.Addr().String() m.node.MasterAddr = l.Addr().String()
...@@ -190,13 +192,13 @@ func (m *Master) Run(ctx context.Context) error { ...@@ -190,13 +192,13 @@ func (m *Master) Run(ctx context.Context) error {
l.Close() // XXX log err ? l.Close() // XXX log err ?
wg.Wait() wg.Wait()
return err // XXX errctx return err
} }
// runMain is the process which implements main master cluster management logic: node tracking, cluster // runMain is the process which implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) runMain(ctx context.Context) (err error) { func (m *Master) runMain(ctx context.Context) (err error) {
defer running(&ctx, "run")(&err) // XXX needed? defer running(&ctx, "main")(&err)
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state // NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
...@@ -209,7 +211,7 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -209,7 +211,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// a command came to us to start the cluster. // a command came to us to start the cluster.
err := m.recovery(ctx) err := m.recovery(ctx)
if err != nil { if err != nil {
log.Error(ctx, err) //log.Error(ctx, err)
return err // recovery cancelled return err // recovery cancelled
} }
...@@ -217,20 +219,18 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -217,20 +219,18 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// case previously it was unclean shutdown. // case previously it was unclean shutdown.
err = m.verify(ctx) err = m.verify(ctx)
if err != nil { if err != nil {
log.Error(ctx, err) //log.Error(ctx, err)
continue // -> recovery continue // -> recovery
} }
// provide service as long as partition table stays operational // provide service as long as partition table stays operational
err = m.service(ctx) err = m.service(ctx)
if err != nil { if err != nil {
log.Error(ctx, err) //log.Error(ctx, err)
continue // -> recovery continue // -> recovery
} }
// XXX could err be == nil here - after service finishes ? // XXX shutdown request ?
// XXX shutdown ?
} }
return ctx.Err() return ctx.Err()
...@@ -242,16 +242,17 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -242,16 +242,17 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// //
// - starts from potentially no storage nodes known // - starts from potentially no storage nodes known
// - accept connections from storage nodes // - accept connections from storage nodes
// - retrieve and recovery latest previously saved partition table from storages // - retrieve and recover latest previously saved partition table from storages
// - monitor whether partition table becomes operational wrt currently up nodeset // - monitor whether partition table becomes operational wrt currently up nodeset
// - if yes - finish recovering upon receiving "start" command XXX or autostart // - if yes - finish recovering upon receiving "start" command XXX or autostart
// storRecovery is result of 1 storage node passing recovery phase // storRecovery is result of 1 storage node passing recovery phase
type storRecovery struct { type storRecovery struct {
node *neo.Node stor *neo.Node
partTab neo.PartitionTable partTab neo.PartitionTable
// XXX + backup_tid, truncate_tid ?
err error err error
// XXX + backup_tid, truncate_tid ?
} }
// recovery drives cluster during recovery phase // recovery drives cluster during recovery phase
......
...@@ -28,6 +28,9 @@ import ( ...@@ -28,6 +28,9 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/log" "lab.nexedi.com/kirr/neo/go/xcommon/log"
) )
// XXX -> task (and current task -> taskctx) ?
// running is syntactic sugar to push new task to operational stack, log it and // running is syntactic sugar to push new task to operational stack, log it and
// adjust error return with task prefix. // adjust error return with task prefix.
// //
...@@ -46,12 +49,21 @@ func runningf(ctxp *context.Context, format string, argv ...interface{}) func(*e ...@@ -46,12 +49,21 @@ func runningf(ctxp *context.Context, format string, argv ...interface{}) func(*e
func _running(ctxp *context.Context, name string) func(*error) { func _running(ctxp *context.Context, name string) func(*error) {
ctx := task.Running(*ctxp, name) ctx := task.Running(*ctxp, name)
*ctxp = ctx *ctxp = ctx
log.Depth(2).Info(ctx) // XXX level = ok? log.Depth(2).Info(ctx, "start")
return func(errp *error) { return func(errp *error) {
if *errp != nil {
// XXX is it good idea to log to error here? (not in above layer)
// XXX what is error here could be not so error above
// XXX or we still want to log all errors - right?
log.Depth(1).Error(ctx, *errp)
} else {
log.Depth(1).Info(ctx, "ok")
}
// XXX do we need vvv if we log it anyway ^^^ ?
// NOTE not *ctxp here - as context pointed by ctxp could be // NOTE not *ctxp here - as context pointed by ctxp could be
// changed when this deferred function is run // changed when this deferred function is run
task.ErrContext(errp, ctx) task.ErrContext(errp, ctx)
// XXX also log task stop here?
} }
} }
...@@ -35,6 +35,9 @@ import ( ...@@ -35,6 +35,9 @@ import (
// //
// Canceling this context releases resources associated with it, so code should // Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete. // call cancel as soon as the operations running in this Context complete.
//
// XXX let Merge do only merge, not create another cancel; optimize it for
// cases when a source context is not cancellable
func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) { func Merge(ctx1, ctx2 context.Context) (context.Context, context.CancelFunc) {
mc := &mergeCtx{ mc := &mergeCtx{
ctx1: ctx1, ctx1: ctx1,
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment