Commit 336fc1be authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 043561f4
...@@ -20,11 +20,25 @@ ...@@ -20,11 +20,25 @@
package neo package neo
// master node // master node
// XXX master organization // Master organization
//
// Master is organized as follows:
//
// - main task that controls whole logic of master working. It spawns
// subtasks to implement that logic and communicate with the subtask via channels. XXX
// Main is the only mutator of nodeTab, partTab, etc.
//
// - accept task that accepts incoming connections and hands them over to main
// via nodeComeq.
// //
// - main goroutine that is the only mutator of nodeTab, partTab, etc
// - per peer workers are spawned that interact with main via channels // - per peer workers are spawned that interact with main via channels
// - δnodeTab, δpartTab updates are proxied to peer by another per-peer goroutine // - δnodeTab, δpartTab updates are proxied to peer by another per-peer task
//
// XXX
//
// XXX
// master manages node and partition tables and broadcast their updates
// to all connected nodes.
import ( import (
"context" "context"
...@@ -52,17 +66,21 @@ import ( ...@@ -52,17 +66,21 @@ import (
type Master struct { type Master struct {
node *xneo.Node node *xneo.Node
// main Runs under runCtx // whole Runs runs under runCtx
runCtx context.Context runCtx context.Context
// master manages node and partition tables and broadcast their updates // "global" workgroup under which main, accept and tasks, that should
// to all connected nodes. δnodeTab/δpartTab updates are proxied to // last for whole Run time, are spawned.
// a peer by per-peer goroutine reading from .notifyTab[peer.nid] channel. mainWG *xsync.WorkGroup
notifyWG sync.WaitGroup // XXX -> runWG ?
// notifyTab map[proto.NodeID]chan _ΔClusterState // XXX -> struct peerWG{.wg, .notifyq} ? // main <- node come or go
// XXX ^^^ -> peerTab ? XXX make it part of .nodeTab through PeerNode.private? nodeComeq chan nodeCome // main <- accept "node connected"
// XXX ^^^ -> peerWorkTab ? nodeLeaveq chan nodeLeave // main <- peerWG.wait "node (should be) disconnected"
peerWorkTab map[proto.NodeID]*peerWork
// in addition to nodeTab (which keeps information about a node) tasks
// that are specific to a peer are organized around peerWorkTab[peer.nid].
peerWorkTab map[proto.NodeID]*_MasteredPeer
// last allocated oid & tid // last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ? // XXX how to start allocating oid from 0, not 1 ?
...@@ -75,36 +93,38 @@ type Master struct { ...@@ -75,36 +93,38 @@ type Master struct {
ctlStop chan chan struct{} // request to stop cluster ctlStop chan chan struct{} // request to stop cluster
ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ? ctlShutdown chan chan error // request to shutdown cluster XXX with ctx ?
// channels from workers directly serving peers to main driver
nodeComeq chan nodeCome // node connected XXX -> acceptq?
// nodeLeaveq chan nodeLeave // node disconnected XXX -> don't need ?
// so tests could override // so tests could override
monotime func() float64 monotime func() float64
} }
// nodeCome represents "node connects" event. // nodeCome represents "node connects" event.
// XXX main <- accept
type nodeCome struct { type nodeCome struct {
req *neonet.Request req *neonet.Request
idReq *proto.RequestIdentification // we received this identification request idReq *proto.RequestIdentification // we received this identification request
} }
/* // nodeLeave represents "node (should be) disconnected" event.
// nodeLeave represents "node disconnects" event.
type nodeLeave struct { type nodeLeave struct {
node *neo.PeerNode node *xneo.PeerNode
} }
*/
// peerWork represents context for all tasks related to one peer. // _MasteredPeer represents context for all tasks related to one peer driven by master.
type peerWork struct { //
// .notify
// .wait (run under mainWG)
type _MasteredPeer struct {
peer *xneo.PeerNode // XXX naming -> node ?
// all tasks are spawned under wg. If any task fails - whole wg is canceled. // all tasks are spawned under wg. If any task fails - whole wg is canceled.
wg *xsync.WorkGroup wg *xsync.WorkGroup
// XXX +cancel
// snapshot of nodeTab/partTab/stateCode when peer was accepted by main. // snapshot of nodeTab/partTab/stateCode when peer was accepted by main.
state0 *xneo.ClusterStateSnapshot state0 *xneo.ClusterStateSnapshot
// main sends δnodeTab/δpartTab/δstateCode to notifyq. // main -> peerWG.notify δnodeTab/δpartTab/δstateCode.
notifyq chan _ΔClusterState notifyq chan _ΔClusterState
// notifyqOverflow becomes ready if main detects that peer is to slow to consume updates // notifyqOverflow becomes ready if main detects that peer is to slow to consume updates
// XXX no need? (peer.notify is canceled via peerWork.cancel)
notifyqOverflow chan struct{} notifyqOverflow chan struct{}
} }
...@@ -189,10 +209,12 @@ func (m *Master) setClusterState(state proto.ClusterState) { ...@@ -189,10 +209,12 @@ func (m *Master) setClusterState(state proto.ClusterState) {
func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() // so that .runCtx is canceled if we return due to an error defer cancel() // so that .runCtx is canceled if we return due to an error
// XXX ^^^ not needed - we first must wait for all spawned subtasks
addr := l.Addr() addr := l.Addr()
defer task.Runningf(&ctx, "master(%v)", addr)(&err) defer task.Runningf(&ctx, "master(%v)", addr)(&err)
m.runCtx = ctx m.runCtx = ctx
m.mainWG = xsync.NewWorkGroup(m.runCtx)
// update our master & serving address in node // update our master & serving address in node
...@@ -211,17 +233,12 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -211,17 +233,12 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
// update nodeTab with self // update nodeTab with self
m.updateNodeTab(ctx, m.node.MyInfo) m.updateNodeTab(ctx, m.node.MyInfo)
// m.node.State.NodeTab.Update(m.node.MyInfo)
// wrap listener with link / identificaton hello checker // wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l)) lli := xneo.NewListener(neonet.NewLinkListener(l))
// accept incoming connections and pass them to main driver // accept: accept incoming connections and pass them to main driver
wg := sync.WaitGroup{} m.mainWG.Go(func(ctx context.Context) (err error) {
serveCtx, serveCancel := context.WithCancel(ctx)
wg.Add(1)
go func(ctx context.Context) (err error) {
defer wg.Done()
defer task.Running(&ctx, "accept")(&err) defer task.Running(&ctx, "accept")(&err)
// XXX dup in storage // XXX dup in storage
...@@ -262,21 +279,26 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -262,21 +279,26 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
continue continue
} }
} }
}(serveCtx) })
// main driving logic // main: main driving logic
err = m.runMain(ctx) m.mainWG.Go(m.main)
serveCancel() err = m.mainWG.Wait()
xio.LClose(ctx, lli) // XXX here ok? // change `... canceled` to just canceled?
wg.Wait() // (e.g. `master: accept: canceled` or `master: main: canceled` -> `master: canceled`)
if ctx.Err() != nil {
err = ctx.Err()
}
xio.LClose(ctx, lli) // XXX here ok? (probbly not)
return err return err
} }
// runMain is the process that implements main master cluster management logic: node tracking, cluster // main is the process that implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes, etc. // state updates, scheduling data movement between storage nodes, etc.
func (m *Master) runMain(ctx context.Context) (err error) { func (m *Master) main(ctx context.Context) (err error) {
defer task.Running(&ctx, "main")(&err) defer task.Running(&ctx, "main")(&err)
// NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state // NOTE Run's goroutine is the only mutator of nodeTab, partTab and other cluster state
...@@ -1236,8 +1258,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1236,8 +1258,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
node.SetLink(n.req.Link()) node.SetLink(n.req.Link())
// make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates // make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates
m.peerWorkTab[node.NID] = &peerWork{ m.peerWorkTab[node.NID] = &_MasteredPeer{
wg: xsync.NewWorkGroup(m.runCtx), wg: xsync.NewWorkGroup(m.runCtx), // XXX wrong -> per peer ctx (derived from runCtx)
state0: m.node.State.Snapshot(), state0: m.node.State.Snapshot(),
// TODO change limiting by buffer size -> to limiting by time // TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details) // (see updateNodeTab for details)
...@@ -1250,7 +1272,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode, ...@@ -1250,7 +1272,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
// accept sends acceptance to just identified peer, sends nodeTab and partTab // accept sends acceptance to just identified peer, sends nodeTab and partTab
// and spawns task to proxy their updates to the peer. XXX // and spawns task to proxy their updates to the peer. XXX
func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, idReq *neonet.Request, idResp proto.Msg) error { func (m *Master) accept(p *_MasteredPeer, idReq *neonet.Request, idResp proto.Msg) error {
// XXX errctx?
err := idReq.Reply(idResp) err := idReq.Reply(idResp)
if err != nil { if err != nil {
return fmt.Errorf("send accept: %w", err) return fmt.Errorf("send accept: %w", err)
...@@ -1258,17 +1281,17 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, ...@@ -1258,17 +1281,17 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot,
// XXX idReq close? // XXX idReq close?
// send initial state snapshot to accepted node // send initial state snapshot to accepted node
link := peer.Link() // XXX -> idReq.Link() instead? link := p.peer.Link() // XXX -> idReq.Link() instead?
// nodeTab // nodeTab
err = link.Send1(&state0.NodeTab) err = link.Send1(&p.state0.NodeTab)
if err != nil { if err != nil {
return fmt.Errorf("send nodeTab: %w", err) return fmt.Errorf("send nodeTab: %w", err)
} }
// partTab (not to S until cluster is RUNNING) // partTab (not to S until cluster is RUNNING)
if !(peer.Type == proto.STORAGE && state0.Code != proto.ClusterRunning) { if !(peer.Type == proto.STORAGE && state0.Code != proto.ClusterRunning) {
err = link.Send1(&state0.PartTab) err = link.Send1(&p.state0.PartTab)
if err != nil { if err != nil {
return fmt.Errorf("send partTab: %w", err) return fmt.Errorf("send partTab: %w", err)
} }
...@@ -1276,27 +1299,31 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, ...@@ -1276,27 +1299,31 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot,
// XXX send clusterState too? (NEO/py does not send it) // XXX send clusterState too? (NEO/py does not send it)
var w *peerWork // XXX stub <- = .peerWorkTab[peer.NID] set from main // spawn p.notify to proxy δnodeTab/δpartTab/δcluterState to peer
p.wg.Go(p.notify)
return nil
}
// go proxy δstate ... XXX // notify proxies δnodeTab/δpeerTab/δClusterState update to the peer.
// XXX under which wg? -> under per-peer wg func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
w.wg.Go(func(ctx context.Context) (err error) { defer task.Runningf(&ctx, "notify")(&err)
defer task.Runningf(&ctx, "send cluster updates")(&err)
stateCode := state0.Code stateCode := p.state0.Code
// XXX vvv right?
return xxcontext.WithCloseOnErrCancel(ctx, link, func() error {
for { for {
var δstate _ΔClusterState var δstate _ΔClusterState
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() // XXX signal to nodeLeaveq ? return ctx.Err()
case <-w.notifyqOverflow: case <-p.notifyqOverflow:
// XXX err -> ? XXX signal to nodeLeaveq ? // XXX err -> ?
return fmt.Errorf("detaching (peer is too slow to consume updates)") return fmt.Errorf("detaching (peer is too slow to consume updates)")
case δstate = <-w.notifyq: // XXX could be also closed? case δstate = <-p.notifyq: // XXX could be also closed?
} }
var msg proto.Msg var msg proto.Msg
...@@ -1340,6 +1367,20 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot, ...@@ -1340,6 +1367,20 @@ func (m *Master) accept(peer *xneo.PeerNode, state0 *xneo.ClusterStateSnapshot,
return nil return nil
} }
// waitAll waits for all tasks related to peer to complete and then notifies
// main that peer node should go. It is spawned under mainWG.
// XXX naming -> wait?
func (p *_MasteredPeer) waitAll(_ context.Context) error {
// don't take our ctx into account - it is ~ runCtx and should be
// parent of context under which per-peer tasks are spawned. This way
// if runCtx is canceled -> any per-peer ctx should be canceled too and
// wg.Wait should not block.
err := p.wg.Wait()
m.nodeLeaveq <- nodeLeave{p, err} // XXX detect if if main is already done
return nil // XXX or ctx.Err() ?
}
// allocNID allocates new node ID for a node of kind nodeType. // allocNID allocates new node ID for a node of kind nodeType.
// XXX it is bad idea for master to assign node ID to coming node // XXX it is bad idea for master to assign node ID to coming node
// -> better nodes generate really unique UUID themselves and always show with them // -> better nodes generate really unique UUID themselves and always show with them
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment