Commit efb911ab authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent fc9a51ca
...@@ -388,9 +388,8 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -388,9 +388,8 @@ func (m *Master) recovery(ctx context.Context) (err error) {
wg := &sync.WaitGroup{} wg := &sync.WaitGroup{}
//trace:event traceMasterStartReady(m *Master, ready bool) //trace:event traceMasterStartReady(m *Master, ready bool)
readyToStart := false readyToStart := false // whether cluster currently can be operational or not
updateReadyToStart := func() { updateReadyToStart := func() {
// update indicator whether cluster currently can be operational or not
var ready bool var ready bool
if m.node.State.PartTab.PTid == 0 { if m.node.State.PartTab.PTid == 0 {
// new cluster - allow startup if we have some storages passed // new cluster - allow startup if we have some storages passed
...@@ -416,16 +415,35 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -416,16 +415,35 @@ func (m *Master) recovery(ctx context.Context) (err error) {
// XXX set cluster state = RECOVERY // XXX set cluster state = RECOVERY
// XXX down clients // XXX down clients
// goStorCtlRecovery spawns recovery task on a storage.
goStorCtlRecovery := func(stor *_MasteredPeer) {
inprogress++
// XXX wg.Add(1) + defer wg.Done() ?
stor.wg.Go(func(peerCtx context.Context) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel()
var pt *xneo.PartitionTable
err := stor.run(ctx, func(...) {
pt, err = storCtlRecovery(...)
})
ack := make(chan struct{})
recoveryq <- storRecovery{stor: stor, partTab: pt, err: err, ack: ack}
<-ack
// canceled recovery does not mean we should down the storage node
if xcontext.Canceled(err) {
err = nil
}
return err
})
}
// start recovery on all storages we are currently in touch with // start recovery on all storages we are currently in touch with
for _, stor := range m.node.State.NodeTab.StorageList() { for _, peer := range m.peerTab {
if stor.State > proto.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if peer.node.Type == proto.STORAGE {
inprogress++ goStorCtlRecovery(peer)
wg.Add(1)
go func() {
defer wg.Done()
storCtlRecovery(ctx, stor, recoveryq)
// XXX acj
}()
} }
} }
...@@ -486,31 +504,10 @@ loop: ...@@ -486,31 +504,10 @@ loop:
} }
// S -> start recovery // S -> start recovery
if peer.node.Type != proto.STORAGE { if peer.node.Type == proto.STORAGE {
break goStorCtlRecovery(peer)
} }
// XXX wg.Add(1) + defer wg.Done() ?
peer.wg.Go(func(peerCtx context.Context) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel()
var pt *xneo.PartitionTable
err := peer.run(ctx, func(...) {
pt, err = storCtlRecovery(...)
})
ack := make(chan struct{})
recoveryq <- storRecovery{stor: peer, partTab: pt, err: err, ack: ack}
<-ack
// canceled recovery does not mean we should down the peer
if xcontext.Canceled(err) {
err = nil
}
return err
})
// a storage node came through recovery - let's see whether // a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there // ptid ↑ and if so we should take partition table from there
case r := <-recoveryq: case r := <-recoveryq:
...@@ -530,8 +527,6 @@ loop: ...@@ -530,8 +527,6 @@ loop:
} }
updateReadyToStart() updateReadyToStart()
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment