Commit 4d71333f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 06158f50
...@@ -162,9 +162,11 @@ func (m *Master) run(ctx context.Context) { ...@@ -162,9 +162,11 @@ func (m *Master) run(ctx context.Context) {
// recovery drives cluster during recovery phase // recovery drives cluster during recovery phase
// //
// when recovery finishes error indicates: // when recovery finishes error indicates:
// - nil: recovery was ok and a command came for cluster to start XXX or autostart // - nil: recovery was ok and a command came for cluster to start
// - !nil: recovery was cancelled // - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) (err error) { func (m *Master) recovery(ctx context.Context) (err error) {
defer errcontextf(&err, "master: recovery")
m.setClusterState(ClusterRecovering) m.setClusterState(ClusterRecovering)
rctx, rcancel := context.WithCancel(ctx) rctx, rcancel := context.WithCancel(ctx)
defer rcancel() defer rcancel()
...@@ -184,7 +186,7 @@ loop: ...@@ -184,7 +186,7 @@ loop:
for { for {
select { select {
case n := <-m.nodeCome: case n := <-m.nodeCome:
node, ok := m.accept(n, /* XXX do not accept clients */) node, ok := m.accept(n, /* XXX only accept storages -> PENDING */)
if !ok { if !ok {
break break
} }
...@@ -227,8 +229,7 @@ loop: ...@@ -227,8 +229,7 @@ loop:
// XXX ok? we want to retrieve all recovery information first? // XXX ok? we want to retrieve all recovery information first?
// XXX or initially S is in PENDING state and // XXX or initially S is in PENDING state and
// transitions to RUNNING only after successful // transitions to RUNNING only after successful recovery?
// recovery?
rcancel() rcancel()
defer func() { defer func() {
...@@ -326,18 +327,24 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) ...@@ -326,18 +327,24 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery)
} }
var errStopRequested = errors.New("stop requested")
var errClusterDegraded = errors.New("cluster became non-operatonal")
// Cluster Verification // Cluster Verification
// -------------------- // --------------------
// //
// - starts with operational parttab // - starts with operational partition table
// - tell all storages to perform data verification (TODO) and retrieve last ids // - tell all storages to perform data verification (TODO) and retrieve last ids
// - once we are done without loosing too much storages in the process (so that // - once we are done without loosing too much storages in the process (so that
// parttab is still operational) we are ready to enter servicing state. // partition table is still operational) we are ready to enter servicing state.
// verify drives cluster via verification phase // verify drives cluster via verification phase
// //
// prerequisite for start: .partTab is operational wrt .nodeTab // prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) (err error) { func (m *Master) verify(ctx context.Context) (err error) {
defer errcontextf(&err, "master: verify")
m.setClusterState(ClusterVerifying) m.setClusterState(ClusterVerifying)
vctx, vcancel := context.WithCancel(ctx) vctx, vcancel := context.WithCancel(ctx)
defer vcancel() defer vcancel()
...@@ -345,39 +352,57 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -345,39 +352,57 @@ func (m *Master) verify(ctx context.Context) (err error) {
verify := make(chan storVerify) verify := make(chan storVerify)
inprogress := 0 inprogress := 0
// XXX ask every storage for verify and wait for _all_ them to complete? // NOTE we don't reset m.lastOid / m.lastTid to 0 in the beginning of verification
// XXX do we need to reset m.lastOid / m.lastTid to 0 in the beginning? // with the idea that XXX
// XXX ask every storage to verify and wait for _all_ them to complete?
// start verification on all storages we are currently in touch with // start verification on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
// XXX check state > DOWN if stor.NodeState > DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++ inprogress++
go storCtlVerify(vctx, stor.Link, verify) go storCtlVerify(vctx, stor.Link, verify)
} }
}
loop: loop:
for inprogress > 0 { for inprogress > 0 {
select { select {
case n := <-m.nodeCome: case n := <-m.nodeCome:
// TODO node, ok := m.accept(n, /* XXX only accept storages -> known ? RUNNING : PENDING */)
_ = n if !ok {
break
}
// new storage arrived - start verification on it too
// XXX ok? or it must first go through recovery check?
inprogress++
go storCtlVerify(vctx, node.Link, verify)
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
// TODO m.nodeTab.UpdateLinkDown(n.link)
_ = n
// if cluster became non-operational - we cancel verification
if !m.partTab.OperationalWith(&m.nodeTab) {
// XXX ok to instantly cancel? or better
// graceful shutdown in-flight verifications?
vcancel()
err = errClusterDegraded
break loop
}
// a storage node came through verification - TODO
case v := <-verify: case v := <-verify:
inprogress-- inprogress--
if v.err != nil { if v.err != nil {
fmt.Printf("master: %v\n", v.err) // XXX err ctx fmt.Printf("master: verify: %v\n", v.err)
// XXX mark S as non-working in nodeTab // XXX mark S as non-working in nodeTab
// check partTab is still operational // check partTab is still operational
// if not -> cancel to go back to recovery // if not -> cancel to go back to recovery
if m.partTab.OperationalWith(&m.nodeTab) { if m.partTab.OperationalWith(&m.nodeTab) {
vcancel() vcancel()
err = fmt.Errorf("cluster became non-operational in the process") err = errClusterDegraded
break loop break loop
} }
} else { } else {
...@@ -395,7 +420,7 @@ loop: ...@@ -395,7 +420,7 @@ loop:
case ech := <-m.ctlStop: case ech := <-m.ctlStop:
ech <- nil // ok ech <- nil // ok
err = fmt.Errorf("stop requested") err = errStopRequested
break loop break loop
case <-ctx.Done(): case <-ctx.Done():
...@@ -404,17 +429,11 @@ loop: ...@@ -404,17 +429,11 @@ loop:
} }
} }
if err != nil {
// XXX -> err = fmt.Errorf("... %v", err)
fmt.Printf("master: verify: %v\n", err)
// consume left verify responses (which should come without delay since it was cancelled) // consume left verify responses (which should come without delay since it was cancelled)
for ; inprogress > 0; inprogress-- { for ; inprogress > 0; inprogress-- {
<-verify <-verify
} }
}
// XXX -> return via channel ?
return err return err
} }
...@@ -475,6 +494,7 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) { ...@@ -475,6 +494,7 @@ func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// service drives cluster during running state // service drives cluster during running state
// //
// prerequisite for start: .partTab is operational wrt .nodeTab and verification passed (XXX)
func (m *Master) service(ctx context.Context) (err error) { func (m *Master) service(ctx context.Context) (err error) {
m.setClusterState(ClusterRunning) m.setClusterState(ClusterRunning)
...@@ -864,7 +884,7 @@ func (m *Master) DriveStorage(ctx context.Context, link *NodeLink) { ...@@ -864,7 +884,7 @@ func (m *Master) DriveStorage(ctx context.Context, link *NodeLink) {
// # (via changing m.clusterState and relying on broadcast ?) // # (via changing m.clusterState and relying on broadcast ?)
// >NotifyClusterInformation (cluster_state=VERIFYING) // >NotifyClusterInformation (cluster_state=VERIFYING)
// //
// # (via changing partTab and relying on broadcast ?) // # (via changing partTab and relying on broadcast ?) -> no sends whole PT initially
// >NotifyPartitionTable (ptid=1, `node 0: S1, R`) // >NotifyPartitionTable (ptid=1, `node 0: S1, R`)
// # S saves PT info locally XXX -> after StartOperation ? // # S saves PT info locally XXX -> after StartOperation ?
// //
......
...@@ -81,8 +81,7 @@ type NodeTable struct { ...@@ -81,8 +81,7 @@ type NodeTable struct {
// Node represents a node entry in NodeTable // Node represents a node entry in NodeTable
type Node struct { type Node struct {
//Info NodeInfo // XXX extract ? XXX -> embedd NodeInfo // XXX good idea to embed ?
NodeInfo
Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ? Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ?
// XXX identified or not ? // XXX identified or not ?
......
...@@ -151,6 +151,11 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool { ...@@ -151,6 +151,11 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
switch cell.CellState { switch cell.CellState {
case UP_TO_DATE, FEEDING: // XXX cell.isReadble in py case UP_TO_DATE, FEEDING: // XXX cell.isReadble in py
// cell says it is readable. let's check whether corresponding node is up // cell says it is readable. let's check whether corresponding node is up
// FIXME checking whether it is up is not really enough -
// - what is needed to check is that data on that node is up
// to last_tid.
//
// We leave it as is for now.
node := nt.Get(cell.NodeUUID) node := nt.Get(cell.NodeUUID)
if node == nil || node.NodeState != RUNNING { // XXX PENDING is also ok ? if node == nil || node.NodeState != RUNNING { // XXX PENDING is also ok ?
continue continue
......
...@@ -263,7 +263,7 @@ func Expect(conn *Conn, msg NEODecoder) error { ...@@ -263,7 +263,7 @@ func Expect(conn *Conn, msg NEODecoder) error {
return errDecode(&errResp) // XXX err ctx return errDecode(&errResp) // XXX err ctx
} }
return fmt.Errorf("unexpected packet: %T", msgType) // XXX err ctx return fmt.Errorf("unexpected packet: %T", msgType) // XXX err ctx -> + conn ?
} }
_, err = msg.NEODecode(pkt.Payload()) _, err = msg.NEODecode(pkt.Payload())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment