Commit 06158f50 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 92c5fd44
......@@ -54,8 +54,6 @@ type Master struct {
ctlStop chan chan error // request to stop cluster
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ?
wantToStart chan chan error // main -> recovery
// channels from various workers to main driver
nodeCome chan nodeCome // node connected
nodeLeave chan nodeLeave // node disconnected
......@@ -71,8 +69,7 @@ type nodeCome struct {
// node disconnects
type nodeLeave struct {
link *NodeLink
// XXX TODO
link *NodeLink // XXX better use uuid allocated on nodeCome ?
}
func NewMaster(clusterName string) *Master {
......@@ -80,16 +77,14 @@ func NewMaster(clusterName string) *Master {
m.nodeUUID = m.allocUUID(MASTER)
// TODO update nodeTab with self
m.clusterState = ClusterRecovering // XXX no elections - we are the only master
go m.run(context.TODO()) // XXX ctx
go m.run(context.TODO()) // XXX ctx
return m
}
// XXX NotifyNodeInformation to all nodes whenever nodetab changes
// Start requests cluster to eventually transition into running state
// it returns an error if such transition is not currently possible (e.g. partition table is not operational)
// it returns an error if such transition is not currently possible to begin (e.g. partition table is not operational)
// it returns nil if the transition began.
// NOTE upon successful return cluster is not yet in running state - the transition will
// take time and could be also automatically aborted due to cluster environment change (e.g.
......@@ -109,12 +104,15 @@ func (m *Master) Stop() error {
}
// Shutdown requests all known nodes in the cluster to stop
// XXX + master's run to finish ?
func (m *Master) Shutdown() error {
panic("TODO")
}
// setClusterState sets .clusterState and notifies subscribers
func (m *Master) setClusterState(state ClusterState) {
if state == m.clusterState {
if state == m.clusterState { // <- XXX do we really need this ?
return
}
......@@ -147,56 +145,8 @@ func (m *Master) run(ctx context.Context) {
continue // -> recovery
}
// XXX shutdown
}
/*
go m.recovery(ctx)
for {
select {
case <-ctx.Done():
// XXX -> shutdown
panic("TODO")
// command to start cluster
case c := <-m.ctlStart:
if m.clusterState != ClusterRecovering {
// start possible only from recovery
// XXX err ctx
c.resp <- fmt.Errorf("start: inappropriate current state: %v", m.clusterState)
break
}
ch := make(chan error)
select {
case <-ctx.Done():
// XXX how to avoid checking this ctx.Done everywhere?
c.resp <- ctx.Err()
panic("TODO")
case m.wantToStart <- ch:
}
err := <-ch
c.resp <- err
if err != nil {
break
}
// recovery said it is ok to start and finished - launch verification
m.setClusterState(ClusterVerifying)
go m.verify(ctx)
// command to stop cluster
case <-m.ctlStop:
// TODO
// command to shutdown
case <-m.ctlShutdown:
// TODO
}
// XXX shutdown ?
}
*/
}
......@@ -207,24 +157,24 @@ func (m *Master) run(ctx context.Context) {
// - accept connections from storage nodes
// - retrieve and recovery latest previously saved partition table from storages
// - monitor whether partition table becomes operational wrt currently up nodeset
// - if yes - finish recovering upon receiving "start" command
// - if yes - finish recovering upon receiving "start" command XXX or autostart
// recovery drives cluster during recovery phase
//
// when recovery finishes error indicates:
// - nil: recovery was ok and a command came for cluster to start
// - nil: recovery was ok and a command came for cluster to start XXX or autostart
// - !nil: recovery was cancelled
func (m *Master) recovery(ctx context.Context) (err error) {
m.setClusterState(ClusterRecovering)
recovery := make(chan storRecovery)
rctx, rcancel := context.WithCancel(ctx)
defer rcancel()
recovery := make(chan storRecovery)
inprogress := 0
// start recovery on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() {
if stor.Info.NodeState > DOWN { // XXX state cmp ok ?
if stor.NodeState > DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++
go storCtlRecovery(rctx, stor.Link, recovery)
}
......@@ -387,13 +337,12 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery)
// verify drives cluster via verification phase
//
// prerequisite for start: .partTab is operational wrt .nodeTab
func (m *Master) verify(ctx context.Context) error {
func (m *Master) verify(ctx context.Context) (err error) {
m.setClusterState(ClusterVerifying)
var err error
verify := make(chan storVerify)
vctx, vcancel := context.WithCancel(ctx)
defer vcancel()
verify := make(chan storVerify)
inprogress := 0
// XXX ask every storage for verify and wait for _all_ them to complete?
......@@ -401,6 +350,7 @@ func (m *Master) verify(ctx context.Context) error {
// start verification on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() {
// XXX check state > DOWN
inprogress++
go storCtlVerify(vctx, stor.Link, verify)
}
......
......@@ -81,7 +81,8 @@ type NodeTable struct {
// Node represents a node entry in NodeTable
type Node struct {
Info NodeInfo // XXX extract ? XXX -> embedd
//Info NodeInfo // XXX extract ? XXX -> embedd
NodeInfo
Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ?
// XXX identified or not ?
......@@ -93,7 +94,7 @@ type Node struct {
func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.Info.NodeUUID == uuid {
if node.NodeUUID == uuid {
return node
}
}
......@@ -111,10 +112,10 @@ func (nt *NodeTable) Update(nodeInfo NodeInfo, link *NodeLink) *Node {
nt.nodev = append(nt.nodev, node)
}
node.Info = nodeInfo
node.NodeInfo = nodeInfo
node.Link = link
nt.notify(node.Info)
nt.notify(node.NodeInfo)
return node
}
......@@ -141,9 +142,9 @@ func (nt *NodeTable) UpdateLinkDown(link *NodeLink) *Node {
panic("nodetab: UpdateLinkDown: no corresponding entry")
}
node.Info.NodeState = DOWN
node.NodeState = DOWN
nt.notify(node.Info)
nt.notify(node.NodeInfo)
return node
}
......@@ -153,7 +154,7 @@ func (nt *NodeTable) StorageList() []*Node {
// FIXME linear scan
sl := []*Node{}
for _, node := range nt.nodev {
if node.Info.NodeType == STORAGE {
if node.NodeType == STORAGE {
sl = append(sl, node)
}
}
......@@ -167,10 +168,9 @@ func (nt *NodeTable) String() string {
buf := bytes.Buffer{}
// XXX also for .storv
for _, node := range nt.nodev {
for _, n := range nt.nodev {
// XXX recheck output
i := node.Info
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", i.NodeUUID, i.NodeType, i.NodeState, i.Address)
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", n.NodeUUID, n.NodeType, n.NodeState, n.Address)
}
return buf.String()
......
......@@ -152,7 +152,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
case UP_TO_DATE, FEEDING: // XXX cell.isReadble in py
// cell says it is readable. let's check whether corresponding node is up
node := nt.Get(cell.NodeUUID)
if node == nil || node.Info.NodeState != RUNNING { // XXX PENDING is also ok ?
if node == nil || node.NodeState != RUNNING { // XXX PENDING is also ok ?
continue
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment