Commit aaae4b15 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1257f0f3
...@@ -35,8 +35,14 @@ import ( ...@@ -35,8 +35,14 @@ import (
// Master is a node overseeing and managing how whole NEO cluster works // Master is a node overseeing and managing how whole NEO cluster works
type Master struct { type Master struct {
// XXX move -> nodeCommon?
// ---- 8< ----
myInfo neo.NodeInfo
clusterName string clusterName string
nodeUUID neo.NodeUUID
net xnet.Network // network we are sending/receiving on
masterAddr string // address of master
// ---- 8< ----
// last allocated oid & tid // last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ? // XXX how to start allocating oid from 0, not 1 ?
...@@ -76,12 +82,13 @@ type nodeLeave struct { ...@@ -76,12 +82,13 @@ type nodeLeave struct {
link *neo.NodeLink // XXX better use uuid allocated on nodeCome ? link *neo.NodeLink // XXX better use uuid allocated on nodeCome ?
} }
// NewMaster TODO ... // NewMaster creates new master node that is listening on serveAddr
// XXX ... call Run
func NewMaster(clusterName, serveAddr string, net xnet.Network) *Master { func NewMaster(clusterName, serveAddr string, net xnet.Network) *Master {
// XXX serveAddr + net // XXX serveAddr + net
m := &Master{clusterName: clusterName} m := &Master{clusterName: clusterName}
m.nodeUUID = m.allocUUID(neo.MASTER) m.myInfo.NodeUUID = m.allocUUID(neo.MASTER)
// TODO update nodeTab with self // TODO update nodeTab with self
m.clusterState = neo.ClusterRecovering // XXX no elections - we are the only master m.clusterState = neo.ClusterRecovering // XXX no elections - we are the only master
// go m.run(context.TODO()) // XXX ctx // go m.run(context.TODO()) // XXX ctx
...@@ -89,6 +96,44 @@ func NewMaster(clusterName, serveAddr string, net xnet.Network) *Master { ...@@ -89,6 +96,44 @@ func NewMaster(clusterName, serveAddr string, net xnet.Network) *Master {
return m return m
} }
// Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) error {
// XXX dup wrt Storage.Run
// start listening
l, err := m.net.Listen(m.myInfo.Address.String()) // XXX ugly
if err != nil {
return err // XXX err ctx
}
// now we know our listening address (in case it was autobind before)
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty
addr, err := neo.Addr(l.Addr())
if err != nil {
// XXX -> panic here ?
return err // XXX err ctx
}
m.myInfo.Address = addr
wg := sync.WaitGroup{}
serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1)
go func() {
defer wg.Done()
err = Serve(serveCtx, l, m)
_ = err // XXX what to do with err ?
}()
err = m.runMain(ctx)
serveCancel()
wg.Wait()
return err // XXX errctx
}
// Start requests cluster to eventually transition into running state // Start requests cluster to eventually transition into running state
// it returns an error if such transition is not currently possible to begin (e.g. partition table is not operational) // it returns an error if such transition is not currently possible to begin (e.g. partition table is not operational)
...@@ -125,9 +170,9 @@ func (m *Master) setClusterState(state neo.ClusterState) { ...@@ -125,9 +170,9 @@ func (m *Master) setClusterState(state neo.ClusterState) {
} }
// Run is the process which implements main master cluster management logic: node tracking, cluster // runMain is the process which implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc // state updates, scheduling data movement between storage nodes etc
func (m *Master) Run(ctx context.Context) (err error) { func (m *Master) runMain(ctx context.Context) (err error) {
//defer xerr.Context(&err, "master: run") //defer xerr.Context(&err, "master: run")
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state // NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
...@@ -158,7 +203,7 @@ func (m *Master) Run(ctx context.Context) (err error) { ...@@ -158,7 +203,7 @@ func (m *Master) Run(ctx context.Context) (err error) {
// XXX shutdown ? // XXX shutdown ?
} }
return fmt.Errorf("master: run: %v\n", ctx.Err()) return fmt.Errorf("master: run: %v\n", ctx.Err()) // XXX run -> runmain ?
} }
...@@ -611,7 +656,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) { ...@@ -611,7 +656,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
n.idResp <- &neo.AcceptIdentification{ n.idResp <- &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
MyNodeUUID: m.nodeUUID, MyNodeUUID: m.myInfo.NodeUUID,
NumPartitions: 1, // FIXME hardcoded NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid, YourNodeUUID: uuid,
......
...@@ -21,6 +21,7 @@ package server ...@@ -21,6 +21,7 @@ package server
import ( import (
"context" "context"
"fmt" "fmt"
"sync"
"time" "time"
"../../neo" "../../neo"
...@@ -69,15 +70,13 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Network, zstor z ...@@ -69,15 +70,13 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Network, zstor z
// Run starts storage node and runs it until either ctx is cancelled or master // Run starts storage node and runs it until either ctx is cancelled or master
// commands it to shutdown. // commands it to shutdown.
func (stor *Storage) Run(ctx context.Context) error { func (stor *Storage) Run(ctx context.Context) error {
// XXX dup wrt Master.Run
// start listening // start listening
l, err := stor.net.Listen(stor.myInfo.Address.String()) // XXX ugly l, err := stor.net.Listen(stor.myInfo.Address.String()) // XXX ugly
if err != nil { if err != nil {
return err // XXX err ctx return err // XXX err ctx
} }
// FIXME -> no -> Serve closes l
defer l.Close() // XXX err ?
// now we know our listening address (in case it was autobind before) // now we know our listening address (in case it was autobind before)
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and // NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234 // listen("tcp6", ":1234") gives l.Addr [::]:1234
...@@ -90,29 +89,43 @@ func (stor *Storage) Run(ctx context.Context) error { ...@@ -90,29 +89,43 @@ func (stor *Storage) Run(ctx context.Context) error {
stor.myInfo.Address = addr stor.myInfo.Address = addr
go stor.talkMaster(ctx) wg := sync.WaitGroup{}
err = Serve(ctx, l, stor) // XXX -> go ? serveCtx, serveCancel := context.WithCancel(ctx)
return err // XXX err ctx wg.Add(1)
go func() {
defer wg.Done()
err = Serve(serveCtx, l, stor)
_ = err // XXX what to do with err ?
}()
// XXX oversee both master and server and wait ? err = stor.talkMaster(ctx)
serveCancel()
wg.Wait()
return err // XXX err ctx
} }
// talkMaster connects to master, announces self and receives notifications and commands // talkMaster connects to master, announces self and receives notifications and commands
// XXX and notifies master about ? (e.g. StartOperation -> NotifyReady) // XXX and notifies master about ? (e.g. StartOperation -> NotifyReady)
// it tries to persist master link reconnecting as needed // it tries to persist master link reconnecting as needed
func (stor *Storage) talkMaster(ctx context.Context) { //
// it always return an error - either due to cancel or commannd from master to shutdown
func (stor *Storage) talkMaster(ctx context.Context) error {
// XXX errctx
for { for {
fmt.Printf("stor: master(%v): connecting\n", stor.masterAddr) // XXX info fmt.Printf("stor: master(%v): connecting\n", stor.masterAddr) // XXX info
err := stor.talkMaster1(ctx) err := stor.talkMaster1(ctx)
fmt.Printf("stor: master(%v): %v\n", stor.masterAddr, err) fmt.Printf("stor: master(%v): %v\n", stor.masterAddr, err)
// TODO if err = shutdown -> return
// XXX handle shutdown command from master // XXX handle shutdown command from master
// throttle reconnecting / exit on cancel // throttle reconnecting / exit on cancel
select { select {
case <-ctx.Done(): case <-ctx.Done():
return return ctx.Err()
// XXX 1s hardcoded -> move out of here // XXX 1s hardcoded -> move out of here
case <-time.After(1*time.Second): case <-time.After(1*time.Second):
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment