Commit d95400c1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 367842e1
......@@ -99,7 +99,7 @@ type _ΔPartTab struct {
// XXX
}
type _ΔStateCode struct {
// XXX
proto.ClusterState // new value
}
func (_ *_ΔNodeTab) δClusterState() {}
......@@ -161,6 +161,7 @@ func (m *Master) setClusterState(state proto.ClusterState) {
m.node.State.Code.Set(state)
// TODO notify subscribers
// <- _ΔStateCode{state}
}
......@@ -368,7 +369,7 @@ loop:
go func() {
defer wg.Done()
err := accept(ctx, n.req, resp)
err := m.accept(ctx, n.req, resp)
if err != nil {
recovery <- storRecovery{stor: node, err: err}
return
......@@ -378,6 +379,8 @@ loop:
storCtlRecovery(ctx, node, recovery)
}()
// XXX <-m.nodeLeave
// a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there
//
......@@ -393,6 +396,7 @@ loop:
r.stor.ResetLink(ctx)
m.updateNodeState(ctx, r.stor, proto.DOWN)
// XXX stop sending nodeTab/partTab updates to this node
// XXX .nodeLeave <- r.stor ?
}
} else {
......@@ -477,6 +481,7 @@ loop2:
if !xcontext.Canceled(r.err) {
r.stor.ResetLink(ctx)
m.updateNodeState(ctx, r.stor, proto.DOWN)
// XXX nodeLeave <-
}
case <-done:
......@@ -617,7 +622,7 @@ loop:
go func() {
defer wg.Done()
err := accept(ctx, n.req, resp)
err := m.accept(ctx, n.req, resp)
if err != nil {
verify <- storVerify{stor: node, err: err}
return
......@@ -626,7 +631,7 @@ loop:
storCtlVerify(ctx, node, m.node.State.PartTab, verify)
}()
/*
/* XXX reenable
case n := <-m.nodeLeave:
n.node.SetState(proto.DOWN)
......@@ -653,6 +658,7 @@ loop:
if !xcontext.Canceled(v.err) {
v.stor.ResetLink(ctx)
m.updateNodeState(ctx, v.stor, proto.DOWN)
// XXX nodeLeave <-
}
// check partTab is still operational
......@@ -702,6 +708,7 @@ loop2:
if !xcontext.Canceled(v.err) {
v.stor.ResetLink(ctx)
m.updateNodeState(ctx, v.stor, proto.DOWN)
// XXX nodeLeave <-
}
case <-done:
......@@ -826,7 +833,7 @@ loop:
go func() {
defer wg.Done()
err = accept(ctx, n.req, resp)
err = m.accept(ctx, n.req, resp)
if err != nil {
serviced <- serviceDone{node: node, err: err}
return
......@@ -850,7 +857,7 @@ loop:
_ = d
/*
// XXX who sends here?
// XXX reenable? -> no - do it ^^^ in <-serviced (?)
case n := <-m.nodeLeave:
n.node.SetState(proto.DOWN)
......@@ -923,9 +930,8 @@ func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
//println(".")
case <-ctx.Done():
// XXX also send StopOperation?
// XXX close link?
return ctx.Err() // XXX ok?
// FIXME also send StopOperation
return ctx.Err()
}
}
}
......@@ -935,8 +941,8 @@ func (m *Master) serveClient(ctx context.Context, cli *xneo.PeerNode) (err error
clink := cli.Link()
defer task.Runningf(&ctx, "%s: client service", clink.RemoteAddr())(&err)
wg, ctx := errgroup.WithContext(ctx)
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink?
wg, ctx := errgroup.WithContext(ctx) // XXX -> sync.WorkGroup
defer xio.CloseWhenDone(ctx, clink)() // XXX -> cli.ResetLink? (better not here)
// FIXME send initial nodeTab and partTab before starting serveClient1
// (move those initial sends from keepPeerUpdated to .accept)
......@@ -1009,6 +1015,7 @@ func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state
m.updateNodeTab(ctx, nodei)
}
/*
// keepPeerUpdated sends cluster state updates to peer on the link.
func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (err error) {
// link should be already in parent ctx (XXX and closed on cancel ?)
......@@ -1093,6 +1100,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
}
}
}
*/
// ----------------------------------------
......@@ -1188,8 +1196,8 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
state0 := m.node.State.Snapshot()
// TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details)
updateq := make(chan _ΔClusterState, 1024)
m.notifyTab[node.NID] = updateq
notifyq := make(chan _ΔClusterState, 1024)
m.notifyTab[node.NID] = notifyq
// XXX go not here - only after initial state is sent out
/*
m.notifyWG.Add(1)
......@@ -1201,6 +1209,87 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
return node, accept
}
func (m *Master) accept(peer *xneo.PeerNode, idReq *neonet.Request, idResp proto.Msg) error {
err := idReq.Reply(idResp)
if err != nil {
return fmt.Errorf("send accept: %w", err)
}
// send initial state snapshot to accepted node
// nodeTab
err = link.Send1(state0.nodeTab) // = proto.NotifyNodeInformation{
if err != nil {
return fmt.Errorf("send nodeTab: %w", err)
}
// partTab (not to S until cluster is RUNNING)
if !(peer.NodeType == proto.STORAGE && cs.StateCode != proto.ClusterRunning) {
err = link.Send1(state0.partTab)
if err != nil {
return fmt.Errorf("send partTab: %w", err)
}
}
// XXX send clusterState too? (NEO/py does not send it)
// go proxy δstate ... XXX
// XXX under which wg? -> under per-peer wg
wg.Go(func(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "send cluster updates")(&err)
stateCode == state0.StateCode
for {
var δstate _ΔClusterState
select {
case <-ctx.Done():
return ctx.Err()
case δstate = <-notifyq: // XXX could be also closed?
}
var msg proto.Msg
switch δstate := δstate.(type) {
case *_ΔNodeTab:
msg = &proto.NotifyNodeInformation{
IdTime: ...,
NodeList: []NodeInfo{δstate.NodeInfo},
}
case *_ΔPartTab:
// don't send δpartTab to S unless cluster is RUNNING
if peer.NodeType == proto.STORAGE && stateCode != ClusterRunning {
continue
}
msg = &proto.NotifyPartitionChanges{
// XXX
// PTid: ...,
// NumReplicas: ...,
// CellList: ...,
}
panic("TODO") // ^^^
case *_ΔStateCode:
stateCode = δstate.ClusterState
msg = &proto.NotifyClusterState{
State: stateCode
}
default:
panic("bug")
}
err := link.Send1(msg)
if err != nil {
return err
}
}
}()
}
// allocNID allocates new node ID for a node of kind nodeType.
// XXX it is bad idea for master to assign node ID to coming node
// -> better nodes generate really unique UUID themselves and always show with them
......
......@@ -41,6 +41,34 @@ func (cs *ClusterState) IsOperational() bool {
return /* cs.Code == proto.ClusterRunning && */ cs.PartTab.OperationalWith(cs.NodeTab)
}
// ClusterStateSnapshot is snapshot of ClusterState.
// XXX place
type ClusterStateSnapshot struct {
NodeTab proto.NotifyNodeInformation
PartTab proto.SendPartitionTable
Code proto.CluterState
}
func (cs *ClusterState) Snapshot() *ClusterStateSnapshot {
S := &ClusterStateSnapshot{Code: cs.Code}
nodev := cs.NodeTab.All()
nodeiv := make([]proto.NodeInfo, len(nodev))
for i, node := range nodev {
// NOTE .NodeInfo is data not pointers - so won't change after we copy it to nodeiv
nodeiv[i] = node.NodeInfo
}
S.NodeTab.IdTime = proto.IdTimeNone // XXX what here?
S.NodeTab.NodeList = nodeiv
S.PartTab.PTid = cs.PartTab.PTid
S.PartTab.NumReplicas = uint32(0) // FIXME hardcoded NumReplicas; NEO/py keeps this as n(replica)-1
S.PartTab.RowList = cs.PartTab.Dump()
return S
}
// Node provides base functionality underlying any NEO node.
//
// Every node knows how to talk to master and receives master idea about:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment