Commit 242c3d85 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent ff521f60
......@@ -190,6 +190,7 @@ func (m *Master) run(ctx context.Context) {
// Cluster Recovery
// ----------------
//
// - starts from potentially no storage nodes known
// - accept connections from storage nodes
// - retrieve and recovery latest previously saved partition table from storages
// - monitor whether partition table becomes operational wrt currently up nodeset
......@@ -200,18 +201,19 @@ func (m *Master) run(ctx context.Context) {
// NOTE during recovery phase `recovery()` owns .partTab and .nodeTab XXX or is the only mutator ?
func (m *Master) recovery(ctx context.Context) {
recovery := make(chan storRecovery)
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()
inprogress := 0
// start recovery on all storages we are currently in touch
for _, stor := range m.nodeTab.StorageList() {
if stor.Info.NodeState > DOWN { // XXX state cmp ok ?
inprogress++
go storCtlRecovery(ctx, stor.Link, recovery)
go storCtlRecovery(rctx, stor.Link, recovery)
}
}
loop:
// XXX really inprogrss > 0 ? (we should be here indefinitely until commanded to start)
//for inprogress > 0 {
for {
select {
case <-ctx.Done():
......@@ -224,29 +226,60 @@ loop:
break
}
// new storage arrived - start recovery on it too
inprogress++
go storCtlRecovery(ctx, node.Link, recovery)
go storCtlRecovery(rctx, node.Link, recovery)
case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link)
// XXX update something indicating cluster currently can be operational or not ?
// result of a storage recovery
case r := <-recovery:
inprogress--
// XXX check r.err
if r.err != nil {
// XXX err ctx?
// XXX log here or in producer?
fmt.Printf("master: %v\n", r.err)
break
}
// we are interested in latest partTab
// NOTE during recovery no one must be subscribed to
// partTab so it is ok to simply change whole m.partTab
if r.partTab.ptid > m.partTab.ptid {
m.partTab = r.partTab
// XXX also transfer subscribers ?
// XXX -> during recovery no one must be subscribed to partTab
}
// TODO
// XXX another channel from master: request "ok to start?" - if ok we reply ok and exit
// XXX update something indicating cluster currently can be operational or not ?
// request from master: request "ok to start?" - if ok we reply ok and exit
// if not ok - we just reply not ok
case s := <-...:
if m.partTab.OperationalWith(&m.nodeTab) {
// reply "ok to start" after whole recovery finishes
// XXX ok? we want to retreive all recovery information first?
// XXX or initially S is in PENDING state and
// transitions to RUNNING only after successful
// recovery?
rcancel()
defer func() {
s.resp <- nil
}()
break loop
}
s.resp <- fmt.Errorf("cluster is non-operational")
}
}
// XXX consume left recovery responces
// consume left recovery responces (which should come without delay since it was cancelled)
for ; inprogress > 0; inprogress-- {
<-recovery
}
}
// storRecovery is result of a storage node passing recovery phase
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment