Commit ff521f60 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 46c7bdf6
...@@ -49,8 +49,9 @@ type Master struct { ...@@ -49,8 +49,9 @@ type Master struct {
clusterState ClusterState clusterState ClusterState
// channels controlling main driver // channels controlling main driver
ctlStart chan ctlStart // request to start cluster ctlStart chan ctlStart // request to start cluster
ctlStop chan ctlStop // request to stop cluster ctlStop chan ctlStop // request to stop cluster
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx too ?
// channels from various workers to main driver // channels from various workers to main driver
nodeCome chan nodeCome // node connected nodeCome chan nodeCome // node connected
...@@ -82,6 +83,8 @@ type nodeLeave struct { ...@@ -82,6 +83,8 @@ type nodeLeave struct {
func NewMaster(clusterName string) *Master { func NewMaster(clusterName string) *Master {
m := &Master{clusterName: clusterName} m := &Master{clusterName: clusterName}
m.nodeUUID = m.allocUUID(MASTER)
// TODO update nodeTab with self
m.clusterState = ClusterRecovering // XXX no elections - we are the only master m.clusterState = ClusterRecovering // XXX no elections - we are the only master
go m.run(context.TODO()) // XXX ctx go m.run(context.TODO()) // XXX ctx
...@@ -100,7 +103,7 @@ func (m *Master) SetClusterState(state ClusterState) error { ...@@ -100,7 +103,7 @@ func (m *Master) SetClusterState(state ClusterState) error {
} }
*/ */
// run implements main master cluster management logic: node tracking, cluster // run implements main master cluster management logic: node tracking, cluster XXX -> only top-level
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) run(ctx context.Context) { func (m *Master) run(ctx context.Context) {
...@@ -113,6 +116,7 @@ func (m *Master) run(ctx context.Context) { ...@@ -113,6 +116,7 @@ func (m *Master) run(ctx context.Context) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX -> shutdown
panic("TODO") panic("TODO")
// command to start cluster // command to start cluster
...@@ -141,6 +145,9 @@ func (m *Master) run(ctx context.Context) { ...@@ -141,6 +145,9 @@ func (m *Master) run(ctx context.Context) {
case <-m.ctlStop: case <-m.ctlStop:
// TODO // TODO
// command to shutdown
case <-m.ctlShutdown:
/* /*
// node connects & requests identification // node connects & requests identification
case n := <-m.nodeCome: case n := <-m.nodeCome:
...@@ -184,36 +191,49 @@ func (m *Master) run(ctx context.Context) { ...@@ -184,36 +191,49 @@ func (m *Master) run(ctx context.Context) {
// ---------------- // ----------------
// //
// - accept connections from storage nodes // - accept connections from storage nodes
// - retrieve and recovery previously saved partition table from storages // - retrieve and recovery latest previously saved partition table from storages
// - monitor whether partition table becomes operational wrt currently up nodeset // - monitor whether partition table becomes operational wrt currently up nodeset
// - if yes - finish recovering upon receiving "start" command // - if yes - finish recovering upon receiving "start" command
// recovery is a process that drives cluster via recovery phase // recovery is a process that drives cluster via recovery phase
// //
// XXX draft: Cluster Recovery if []Stor is fixed // NOTE during recovery phase `recovery()` owns .partTab and .nodeTab XXX or is the only mutator ?
// NOTE during recovery phase `recovery()` owns m.partTab func (m *Master) recovery(ctx context.Context) {
// XXX what about .nodeTab ?
func (m *Master) recovery(ctx context.Context, storv []*NodeLink) {
recovery := make(chan storRecovery) recovery := make(chan storRecovery)
//wg := sync.WaitGroup{}
inprogress := 0 inprogress := 0
for _, stor := range storv { for _, stor := range m.nodeTab.StorageList() {
//wg.Add(1) if stor.Info.NodeState > DOWN { // XXX state cmp ok ?
inprogress++ inprogress++
go storCtlRecovery(ctx, stor, recovery) go storCtlRecovery(ctx, stor.Link, recovery)
}
} }
loop: loop:
// XXX really inprogrss > 0 ? (we should be here indefinitely until commanded to start) // XXX really inprogrss > 0 ? (we should be here indefinitely until commanded to start)
for inprogress > 0 { //for inprogress > 0 {
for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX // XXX
break loop break loop
case n := <-m.nodeCome:
node, ok := m.accept(n, /* XXX do not accept clients */)
if !ok {
break
}
inprogress++
go storCtlRecovery(ctx, node.Link, recovery)
case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link)
case r := <-recovery: case r := <-recovery:
inprogress-- inprogress--
// XXX check r.err
if r.partTab.ptid > m.partTab.ptid { if r.partTab.ptid > m.partTab.ptid {
m.partTab = r.partTab m.partTab = r.partTab
// XXX also transfer subscribers ? // XXX also transfer subscribers ?
...@@ -227,20 +247,18 @@ loop: ...@@ -227,20 +247,18 @@ loop:
} }
// XXX consume left recovery responces // XXX consume left recovery responces
//wg.Wait()
} }
// storRecovery is result of a storage node passing recovery phase // storRecovery is result of a storage node passing recovery phase
type storRecovery struct { type storRecovery struct {
partTab PartitionTable partTab PartitionTable
// XXX + lastOid, lastTid, backup_tid, truncate_tid ? // XXX + backup_tid, truncate_tid ?
err error err error
} }
// storCtlRecovery drives a storage node during cluster recovering state // storCtlRecovery drives a storage node during cluster recovering state
// TODO text // it retrieves various ids and parition table from as stored on the storage
func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) { func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) {
var err error var err error
defer func() { defer func() {
...@@ -249,7 +267,7 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) ...@@ -249,7 +267,7 @@ func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery)
} }
// XXX on err still provide feedback to storRecovery chan ? // XXX on err still provide feedback to storRecovery chan ?
res<- storRecovery{err: err} res <- storRecovery{err: err}
/* /*
fmt.Printf("master: %v", err) fmt.Printf("master: %v", err)
...@@ -444,15 +462,15 @@ func (m *Master) stop(ctx context.Context, storv []*NodeLink) { ...@@ -444,15 +462,15 @@ func (m *Master) stop(ctx context.Context, storv []*NodeLink) {
} }
// accept processes identification request of just connected node and either accepts or declines it // accept processes identification request of just connected node and either accepts or declines it
// if node identification is accepted nodeTab is updated and corresponding nodeInfo is returned // if node identification is accepted nodeTab is updated and corresponding node entry is returned
func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) { func (m *Master) accept(n nodeCome) (node *Node, ok bool) {
// XXX also verify ? : // XXX also verify ? :
// - NodeType valid // - NodeType valid
// - IdTimestamp ? // - IdTimestamp ?
if n.idReq.ClusterName != m.clusterName { if n.idReq.ClusterName != m.clusterName {
n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX
return return nil, false
} }
nodeType := n.idReq.NodeType nodeType := n.idReq.NodeType
...@@ -463,12 +481,12 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) { ...@@ -463,12 +481,12 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
} }
// XXX uuid < 0 (temporary) -> reallocate if conflict ? // XXX uuid < 0 (temporary) -> reallocate if conflict ?
node := m.nodeTab.Get(uuid) node = m.nodeTab.Get(uuid)
if node != nil { if node != nil {
// reject - uuid is already occupied by someone else // reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting // XXX check also for down state - it could be the same node reconnecting
n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
return return nil, false
} }
// XXX accept only certain kind of nodes depending on .clusterState, e.g. // XXX accept only certain kind of nodes depending on .clusterState, e.g.
...@@ -499,7 +517,7 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) { ...@@ -499,7 +517,7 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
nodeState = RUNNING nodeState = RUNNING
} }
nodeInfo = NodeInfo{ nodeInfo := NodeInfo{
NodeType: nodeType, NodeType: nodeType,
Address: n.idReq.Address, Address: n.idReq.Address,
NodeUUID: uuid, NodeUUID: uuid,
...@@ -507,9 +525,8 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) { ...@@ -507,9 +525,8 @@ func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
IdTimestamp: monotime(), IdTimestamp: monotime(),
} }
m.nodeTab.Update(nodeInfo) // NOTE this notifies al nodeTab subscribers node = m.nodeTab.Update(nodeInfo, n.link) // NOTE this notifies al nodeTab subscribers
return node, true
return nodeInfo, true
} }
// allocUUID allocates new node uuid for a node of kind nodeType // allocUUID allocates new node uuid for a node of kind nodeType
......
...@@ -72,15 +72,16 @@ type NodeTable struct { ...@@ -72,15 +72,16 @@ type NodeTable struct {
// users have to care locking explicitly // users have to care locking explicitly
//sync.RWMutex XXX needed ? //sync.RWMutex XXX needed ?
nodev []*Node //storv []*Node // storages
nodev []*Node // all other nodes
notifyv []chan NodeInfo // subscribers notifyv []chan NodeInfo // subscribers
ver int // ↑ for versioning XXX do we need this? //ver int // ↑ for versioning XXX do we need this?
} }
// Node represents a node entry in NodeTable // Node represents a node entry in NodeTable
type Node struct { type Node struct {
Info NodeInfo // XXX extract ? Info NodeInfo // XXX extract ? XXX -> embedd
Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ? Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ?
// XXX identified or not ? // XXX identified or not ?
...@@ -88,6 +89,102 @@ type Node struct { ...@@ -88,6 +89,102 @@ type Node struct {
} }
// Get finds node by uuid
func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.Info.NodeUUID == uuid {
return node
}
}
return nil
}
// XXX GetByAddress ?
// Update updates information about a node
// it returns corresponding node entry for convenience
func (nt *NodeTable) Update(nodeInfo NodeInfo, link *NodeLink) *Node {
node := nt.Get(nodeInfo.NodeUUID)
if node == nil {
node = &Node{}
nt.nodev = append(nt.nodev, node)
}
node.Info = nodeInfo
node.Link = link
nt.notify(node.Info)
return node
}
// GetByLink finds node by node-link
// XXX is this a good idea ?
func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.Link == link {
return node
}
}
return nil
}
// UpdateLinkDown updates information about corresponding to link node and marks it as down
// it returns corresponding node entry for convenience
// XXX is this a good idea ?
func (nt *NodeTable) UpdateLinkDown(link *NodeLink) *Node {
node := nt.GetByLink(link)
if node == nil {
// XXX vvv not good
panic("nodetab: UpdateLinkDown: no corresponding entry")
}
node.Info.NodeState = DOWN
nt.notify(node.Info)
return node
}
// StorageList returns list of all storages in node table
func (nt *NodeTable) StorageList() []*Node {
// FIXME linear scan
sl := []*Node{}
for _, node := range nt.nodev {
if node.Info.NodeType == STORAGE {
sl = append(sl, node)
}
}
return sl
}
func (nt *NodeTable) String() string {
//nt.RLock() // FIXME -> it must be client
//defer nt.RUnlock()
buf := bytes.Buffer{}
// XXX also for .storv
for _, node := range nt.nodev {
// XXX recheck output
i := node.Info
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", i.NodeUUID, i.NodeType, i.NodeState, i.Address)
}
return buf.String()
}
// notify notifies NodeTable subscribers that nodeInfo was updated
func (nt *NodeTable) notify(nodeInfo NodeInfo) {
// XXX rlock for .notifyv ?
for _, notify := range nt.notifyv {
notify <- nodeInfo
}
}
// Subscribe subscribes to NodeTable updates // Subscribe subscribes to NodeTable updates
// it returns a channel via which updates will be delivered and unsubscribe function // it returns a channel via which updates will be delivered and unsubscribe function
// //
...@@ -156,65 +253,3 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func() ...@@ -156,65 +253,3 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []NodeInfo, unsubscribe func()
return ch, unsubscribe return ch, unsubscribe
} }
// Update updates information about a node
// XXX how to pass link into node?
func (nt *NodeTable) Update(nodeInfo NodeInfo) {
node := nt.Get(nodeInfo.NodeUUID)
if node == nil {
node = &Node{}
nt.nodev = append(nt.nodev, node)
}
node.Info = nodeInfo
// notify subscribers
// XXX rlock for .notifyv ?
for _, notify := range nt.notifyv {
notify <- nodeInfo
}
}
/*
// XXX ? ^^^ Update is enough ?
func (nt *NodeTable) Add(node *Node) {
// XXX check node is already there
// XXX pass/store node by pointer ?
nt.nodev = append(nt.nodev, node)
// TODO notify all nodelink subscribers about new info
}
*/
// TODO subscribe for changes on Add ? (notification via channel)
// Get finds node by uuid
func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.Info.NodeUUID == uuid {
return node
}
}
return nil
}
// XXX GetByAddress ?
func (nt *NodeTable) String() string {
//nt.RLock() // FIXME -> it must be client
//defer nt.RUnlock()
buf := bytes.Buffer{}
for _, node := range nt.nodev {
// XXX recheck output
i := node.Info
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", i.NodeUUID, i.NodeType, i.NodeState, i.Address)
}
return buf.String()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment