Commit 3c25e926 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9d959716
......@@ -432,6 +432,37 @@ func (m *Master) recovery(ctx context.Context) (err error) {
loop:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
// request to start the cluster - if ok we exit replying ok
// if not ok - we just reply not ok
case ech := <-m.ctlStart:
if readyToStart {
log.Infof(ctx, "start command - we are ready")
// reply "ok to start" after whole recovery finishes
// XXX ok? we want to retrieve all recovery information first?
// XXX or initially S is in PENDING state and
// transitions to RUNNING only after successful recovery?
rcancel()
defer func() {
// XXX can situation change while we are shutting down?
// XXX -> recheck logic with checking PT operational ^^^
// XXX (depending on storages state)
ech <- nil
}()
break loop
}
log.Infof(ctx, "start command - err - we are not ready")
ech <- fmt.Errorf("start: cluster is non-operational")
case ech := <-m.ctlStop:
close(ech) // ok; we are already recovering
// new connection comes in
case n := <-m.nodeComeq:
peer, ok := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
......@@ -441,6 +472,18 @@ loop:
// if new storage arrived - start recovery on it too
inprogress++
peer.wg.Go(func(peerCtx context.Context) error {
err := m.accept(peerCtx, peer)
if err != nil {
return err // XXX -> recoveryq
}
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
//defer cancel()
})
wg.Add(1)
go func() {
defer wg.Done()
......@@ -513,62 +556,7 @@ loop:
updateReadyToStart()
/*
// XXX move -> updateReadyToStart?
// update indicator whether cluster currently can be operational or not
var ready bool
if m.node.State.PartTab.PTid == 0 {
// new cluster - allow startup if we have some storages passed
// recovery and there is no in-progress recovery running
nup := 0
for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State > proto.DOWN {
nup++
}
}
ready = (nup > 0 && inprogress == 0)
} else {
ready = m.node.State.PartTab.OperationalWith(m.node.State.NodeTab) // XXX + node state
}
if readyToStart != ready {
readyToStart = ready
traceMasterStartReady(m, ready)
}
*/
// request to start the cluster - if ok we exit replying ok
// if not ok - we just reply not ok
case ech := <-m.ctlStart:
if readyToStart {
log.Infof(ctx, "start command - we are ready")
// reply "ok to start" after whole recovery finishes
// XXX ok? we want to retrieve all recovery information first?
// XXX or initially S is in PENDING state and
// transitions to RUNNING only after successful recovery?
rcancel()
defer func() {
// XXX can situation change while we are shutting down?
// XXX -> recheck logic with checking PT operational ^^^
// XXX (depending on storages state)
ech <- nil
}()
break loop
}
log.Infof(ctx, "start command - err - we are not ready")
ech <- fmt.Errorf("start: cluster is non-operational")
case ech := <-m.ctlStop:
close(ech) // ok; we are already recovering
case <-ctx.Done():
err = ctx.Err()
break loop
}
}
......@@ -716,6 +704,18 @@ func (m *Master) verify(ctx context.Context) (err error) {
loop:
for inprogress > 0 {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
case ech := <-m.ctlStart:
ech <- nil // we are already starting
case ech := <-m.ctlStop:
close(ech) // ok
err = errStopRequested
break loop
case n := <-m.nodeComeq:
node, state0, resp := m.identify(ctx, n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
......@@ -784,18 +784,6 @@ loop:
m.lastTid = v.lastTid
}
}
case ech := <-m.ctlStart:
ech <- nil // we are already starting
case ech := <-m.ctlStop:
close(ech) // ok
err = errStopRequested
break loop
case <-ctx.Done():
err = ctx.Err()
break loop
}
}
......@@ -929,6 +917,19 @@ func (m *Master) service(ctx context.Context) (err error) {
loop:
for {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
case ech := <-m.ctlStart:
ech <- nil // we are already started
case ech := <-m.ctlStop:
close(ech) // ok
err = fmt.Errorf("stop requested")
// XXX tell storages to stop
break loop
// new connection comes in
case n := <-m.nodeComeq:
peer, ok := m.identify(ctx, n, /* XXX accept everyone */)
......@@ -975,21 +976,7 @@ loop:
}
*/
// XXX what else ? (-> txn control at least)
case ech := <-m.ctlStart:
ech <- nil // we are already started
case ech := <-m.ctlStop:
close(ech) // ok
err = fmt.Errorf("stop requested")
// XXX tell storages to stop
break loop
case <-ctx.Done():
err = ctx.Err()
break loop
}
}
......@@ -1334,6 +1321,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (peer *_MasteredPeer,
// accept sends acceptance to just identified peer, sends nodeTab and partTab
// and spawns task to proxy their updates to the peer. XXX
// XXX +ctx?
func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request) error {
// XXX errctx?
err := idReq.Reply(p.accept)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment