Commit eb2becec authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 36d2c08e
......@@ -25,17 +25,24 @@ import (
"io"
"log"
"os"
"sync"
)
// Master is a node overseeing and managing how whole NEO cluster works
type Master struct {
clusterName string
clusterState ClusterState
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
nodeTab NodeTable
partTab PartitionTable
stateMu sync.RWMutex
nodeTab NodeTable
partTab PartitionTable
clusterState ClusterState
//nodeEventQ chan ... // for node connected / disconnected events
//txnCommittedQ chan ... // TODO for when txn is committed
}
func NewMaster(clusterName string) *Master {
......@@ -53,11 +60,45 @@ func (m *Master) SetClusterState(state ClusterState) {
// XXX actions ?
}
// run implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc
/*
func (m *Master) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
panic("TODO")
case nodeEvent := <-m.nodeEventQ:
// TODO update nodeTab
// add info to nodeTab
m.nodeTab.Lock()
m.nodeTab.Add(&Node{nodeInfo, link})
m.nodeTab.Unlock()
// TODO notify nodeTab changes
// TODO consider adjusting partTab
// TODO consider how this maybe adjust cluster state
//case txnCommitted := <-m.txnCommittedQ:
}
}
}
*/
// ServeLink serves incoming node-node link connection
// XXX +error return?
func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("master: %s: serving new node\n", link)
//var node *Node
// close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to
// terminate with an error )
......@@ -83,28 +124,161 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
return
}
// add info to nodeTab
m.nodeTab.Lock()
m.nodeTab.Add(&Node{nodeInfo, link})
m.nodeTab.Unlock()
// notify main logic node connects/disconnects
_ = nodeInfo
/*
node = &Node{nodeInfo, link}
m.nodeq <- node
defer func() {
node.state = DOWN
m.nodeq <- node
}()
*/
// subscribe to nodeTab/partTab/clusterState and notify peer with updates
m.stateMu.Lock()
//clusterCh := make(chan ClusterState)
nodeCh, nodeUnsubscribe := m.nodeTab.SubscribeBuffered()
_ = nodeCh; _ = nodeUnsubscribe
//partCh, partUnsubscribe := m.partTab.SubscribeBuffered()
// TODO cluster subscribe
//m.clusterNotifyv = append(m.clusterNotifyv, clusterCh)
// TODO subscribe to nodeTab and broadcast updates:
//
// NotifyPartitionTable PM -> S, C
// PartitionChanges PM -> S, C // subset of NotifyPartitionTable (?)
// NotifyNodeIntormation PM -> *
// TODO read initial nodeTab/partTab while still under lock
// TODO send later this initial content to peer
// TODO notify about cluster state changes
// ClusterInformation (PM -> * ?)
m.stateMu.Unlock()
/*
go func() {
var updates []...
}()
go func() {
var clusterState ClusterState
changed := false
select {
case clusterState = <-clusterCh:
changed = true
}()
*/
// identification passed, now serve other requests
// client: notify + serve requests
m.ServeClient(ctx, link)
// storage:
//
m.DriveStorage(ctx, link)
}
// ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return?
//func (m *Master) ServeClient(ctx context.Context, conn *Conn) {
func (m *Master) ServeClient(ctx context.Context, link *NodeLink) {
// TODO
}
// ---- internal requests for storage driver ----
// storageRecovery asks storage driver to extract cluster recovery information from storage
type storageRecovery struct {
resp chan PartitionTable // XXX +err ?
}
// storageVerify asks storage driver to perform verification (i.e. "data recovery") operation
type storageVerify struct {
// XXX what is result ?
}
// storageStartOperation asks storage driver to start storage node operating
type storageStartOperation struct {
resp chan error // XXX
}
// storageStopOperation asks storage driver to stop storage node oerating
type storageStopOperation struct {
resp chan error
}
// DriveStorage serves incoming connection on which peer identified itself as storage
//
// There are 2 connections:
// - notifications: unidirectional M -> S notifications (nodes, parttab, cluster state)
// - control: bidirectional M <-> S
//
// In control communication master always drives the exchange - talking first
// with e.g. a command or request and expects corresponding answer
//
// XXX +error return?
func (m *Master) DriveStorage(ctx context.Context, link *NodeLink) {
// ? >UnfinishedTransactions
// ? <AnswerUnfinishedTransactions (none currently)
// TODO go for notify chan
for {
select {
case <-ctx.Done():
return // XXX recheck
// // request from master to do something
// case mreq := <-xxx:
// switch mreq := mreq.(type) {
// case storageRecovery:
// case storageVerify:
// // TODO
// case storageStartOperation:
// // XXX timeout ?
// // XXX -> chat2 ?
// err = EncodeAndSend(conn, &StartOperation{Backup: false /* XXX hardcoded */})
// if err != nil {
// // XXX err
// }
// pkt, err := RecvAndDecode(conn)
// if err != nil {
// // XXX err
// }
// switch pkt := pkt.(type) {
// default:
// err = fmt.Errorf("unexpected answer: %T", pkt)
// case *NotifyReady:
// }
// // XXX better in m.nodeq ?
// mreq.resp <- err // XXX err ctx
// case storageStopOperation:
// // TODO
// }
}
}
// RECOVERY (master.recovery.RecoveryManager + master.handlers.identification.py)
// --------
// """
......@@ -166,35 +340,6 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
// ...
//
// StopOperation PM -> S
}
// ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return?
func (m *Master) ServeClient(ctx context.Context, conn *Conn) {
// TODO
}
// ServeStorage serves incoming connection on which peer identified itself as storage
// XXX +error return?
func (m *Master) ServeStorage(ctx context.Context, conn *Conn) {
// TODO
// >Recovery
// <AnswerRecovery
// ? >UnfinishedTransactions
// ? <AnswerUnfinishedTransactions (none currently)
// <NotifyReady
// >StartOperation
// >StopOperation (on shutdown)
}
func (m *Master) ServeAdmin(ctx context.Context, conn *Conn) {
......
......@@ -19,6 +19,8 @@ package neo
// node management & node table
import (
"bytes"
"fmt"
"sync"
)
......@@ -61,13 +63,17 @@ import (
// sure not to accept new connections -> XXX not needed - just stop listening
// first.
//
// XXX once a node was added to NodeTable its entry never deleted: if e.g. a
// connection to node is lost associated entry is marked as having DOWN (XXX or UNKNOWN ?) node
// state.
//
// NodeTable zero value is valid empty node table.
type NodeTable struct {
// users have to care locking explicitly
sync.RWMutex
nodev []Node
nodev []*Node
subscribev []chan *Node
ver int // ↑ for versioning XXX do we need this?
}
......@@ -81,6 +87,74 @@ type Node struct {
}
// Subscribe subscribes to NodeTable updates
// it returns a channel via which updates will be delivered
func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) {
ch = make(chan *Node) // XXX how to specify ch buf size if needed ?
nt.subscribev = append(nt.subscribev, ch)
unsubscribe = func() {
for i, c := range nt.subscribev {
if c == ch {
nt.subscribev = append(nt.subscribev[:i], nt.subscribev[i+1:]...)
close(ch)
return
}
}
panic("XXX unsubscribe not subscribed channel")
}
return ch, unsubscribe
}
// SubscribeBufferred subscribes to NodeTable updates without blocking updater
// it returns a channel via which updates are delivered
// the updates are sent to destination in non-blocking way - if destination channel is not ready
// they will be bufferred.
// it is the caller reponsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown
//
// must be called with stateMu held
func (nt *NodeTable) SubscribeBuffered() (ch chan []*Node, unsubscribe func()) {
in, unsubscribe := nt.Subscribe()
ch = make(chan []*Node)
go func() {
var updatev []*Node
shutdown := false
for {
out := ch
if len(updatev) == 0 {
if shutdown {
// nothing to send and source channel closed
// -> close destination and stop
close(ch)
break
}
out = nil
}
select {
case update, ok := <-in:
if !ok {
shutdown = true
break
}
// FIXME better merge as same node could be updated several times
updatev = append(updatev, update)
case out <- updatev:
updatev = nil
}
}
}()
return ch, unsubscribe
}
// UpdateNode updates information about a node
func (nt *NodeTable) UpdateNode(nodeInfo NodeInfo) {
// TODO
......@@ -90,7 +164,7 @@ func (nt *NodeTable) UpdateNode(nodeInfo NodeInfo) {
func (nt *NodeTable) Add(node *Node) {
// XXX check node is already there
// XXX pass/store node by pointer ?
nt.nodev = append(nt.nodev, *node)
nt.nodev = append(nt.nodev, node)
// TODO notify all nodelink subscribers about new info
}
......@@ -98,6 +172,19 @@ func (nt *NodeTable) Add(node *Node) {
// TODO subscribe for changes on Add ? (notification via channel)
// Lookup finds node by nodeID
func (nt *NodeTable) Lookup(nodeID NodeID) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.Info.NodeID == nodeID {
return node
}
}
return nil
}
// XXX LookupByAddress ?
func (nt *NodeTable) String() string {
//nt.RLock() // FIXME -> it must be client
......@@ -105,9 +192,10 @@ func (nt *NodeTable) String() string {
buf := bytes.Buffer{}
for node := range nl.nodev {
for _, node := range nt.nodev {
// XXX recheck output
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", node.NodeID, node.NodeType, node.NodeState, node.Address)
i := node.Info
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", i.NodeID, i.NodeType, i.NodeState, i.Address)
}
return buf.String()
......
......@@ -137,14 +137,14 @@ type PartitionCell struct {
//
// XXX or keep not only NodeID in PartitionCell - add *Node ?
func (pt *PartitionTable) Operational() bool {
for ptEntry := range pt.ptTab {
for _, ptEntry := range pt.ptTab {
if len(ptEntry) == 0 {
return false
}
ok := false
cellLoop:
for cell := range ptEntry {
for _, cell := range ptEntry {
switch cell.CellState {
case UP_TO_DATE, FEEDING: // XXX cell.isReadble in py
ok = true
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment