Commit 60abb484 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent b0b326ef
......@@ -48,6 +48,8 @@ type Master struct {
// master manages node and partition tables and broadcast their updates
// to all nodes in the cluster
notifyWG sync.WaitGroup // nodeTab/partTab updates are proxied by per-peer goroutine
notifyTab map[proto.NodeID]chan _ΔClusterState // registered in notifyTab XXX ^^^
// last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ?
......@@ -62,12 +64,41 @@ type Master struct {
// channels from workers directly serving peers to main driver
nodeCome chan nodeCome // node connected XXX -> acceptq?
// nodeLeave chan nodeLeave // node disconnected XXX -> don't need
// nodeLeave chan nodeLeave // node disconnected XXX -> don't need ?
// so tests could override
monotime func() float64
}
// nodeCome represents "node connects" event.
type nodeCome struct {
req *neonet.Request
idReq *proto.RequestIdentification // we received this identification request
}
/*
// nodeLeave represents "node disconnects" event.
type nodeLeave struct {
node *neo.PeerNode
}
*/
// _ΔClusterState represents δnodeTab/δpartTab/δClusterState.
type _ΔClusterState interface { δClusterState() }
type _ΔNodeTab struct {
proto.NodeInfo // new value for change of 1 nodeTab entry
}
type _ΔPartTab struct {
// XXX
}
type _ΔStateCode struct {
// XXX
}
func (_ *_ΔNodeTab) δClusterState() {}
func (_ *_ΔPartTab) δClusterState() {}
func (_ *_ΔStateCode) δClusterState() {}
// NewMaster creates new master node.
//
......@@ -148,7 +179,8 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
}
// update nodeTab with self
m.node.State.NodeTab.Update(m.node.MyInfo)
m.updateNodeTab(ctx, m.node.MyInfo)
// m.node.State.NodeTab.Update(m.node.MyInfo)
// wrap listener with link / identificaton hello checker
lli := xneo.NewListener(neonet.NewLinkListener(l))
......@@ -205,7 +237,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
err = m.runMain(ctx)
serveCancel()
xio.LClose(ctx, lli)
xio.LClose(ctx, lli) // XXX here ok?
wg.Wait()
return err
......@@ -315,6 +347,7 @@ loop:
select {
// new connection comes in
case n := <-m.nodeCome:
// FIXME if identify=ok -> subscribe to δ(nodeTab) and send initial nodeTab right after accept (accept should do it?)
node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */)
if node == nil {
......@@ -351,7 +384,8 @@ loop:
if !xcontext.Canceled(r.err) {
r.stor.ResetLink(ctx)
r.stor.SetState(proto.DOWN)
m.updateNodeState(ctx, r.stor, proto.DOWN)
// XXX stop sending nodeTab/partTab updates to this node
}
} else {
......@@ -435,7 +469,7 @@ loop2:
if !xcontext.Canceled(r.err) {
r.stor.ResetLink(ctx)
r.stor.SetState(proto.DOWN)
m.updateNodeState(ctx, r.stor, proto.DOWN)
}
case <-done:
......@@ -453,7 +487,7 @@ loop2:
// XXX recheck logic is ok for when starting existing cluster
for _, stor := range m.node.State.NodeTab.StorageList() {
if stor.State == proto.PENDING {
stor.SetState(proto.RUNNING)
m.updateNodeState(ctx, stor, proto.RUNNING)
}
}
......@@ -611,7 +645,7 @@ loop:
if !xcontext.Canceled(v.err) {
v.stor.ResetLink(ctx)
v.stor.SetState(proto.DOWN)
m.updateNodeState(ctx, v.stor, proto.DOWN)
}
// check partTab is still operational
......@@ -660,7 +694,7 @@ loop2:
if !xcontext.Canceled(v.err) {
v.stor.ResetLink(ctx)
v.stor.SetState(proto.DOWN)
m.updateNodeState(ctx, v.stor, proto.DOWN)
}
case <-done:
......@@ -849,7 +883,7 @@ func storCtlService(ctx context.Context, stor *xneo.PeerNode) (err error) {
slink := stor.Link()
defer task.Runningf(&ctx, "%s: stor service", slink.RemoteAddr())(&err)
// XXX send nodeTab ?
// XXX send nodeTab ? -> yes
// XXX send clusterInformation ?
// XXX current neo/py does StartOperation / NotifyReady as separate
......@@ -941,6 +975,35 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// ----------------------------------------
// XXX place
// called from main master process. XXX
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) {
m.node.State.NodeTab.Update(nodeInfo)
event := &_ΔNodeTab{nodeInfo}
// XXX locking
for nid, ch := range m.notifyTab {
// TODO change limiting by buffer size to limiting by time -
// - i.e. detach peer if event queue grows more than 30s of time.
select {
case ch <- event:
continue // ok
default:
}
log.Warningf(ctx, "peer %s is slow -> detaching it", nid)
// TODO ^^^
}
}
// XXX place
// XXX doc
func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state proto.NodeState) {
nodei := node.NodeInfo
nodei.State = state
m.updateNodeTab(ctx, nodei)
}
// keepPeerUpdated sends cluster state updates to peer on the link.
func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (err error) {
// link should be already in parent ctx (XXX and closed on cancel ?)
......@@ -989,7 +1052,7 @@ func (m *Master) keepPeerUpdated(ctx context.Context, link *neonet.NodeLink) (er
return err
}
err = link.Send1(&proto.SendPartitionTable{
err = link.Send1(&proto.SendPartitionTable{ // XXX to C, but not to S?
PTid: ptid,
NumReplicas: ptnr,
RowList: ptv,
......@@ -1112,8 +1175,24 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *xneo.PeerNode,
IdTime: proto.IdTime(m.monotime()),
}
node = m.node.State.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers
// node = m.node.State.NodeTab.Update(nodeInfo) // NOTE this notifies all nodeTab subscribers
node = m.updateNodeTab(ctx, nodeInfo)
node.SetLink(n.req.Link())
// make nodeTab/partTab snapshot to push to accepted node and subscribe it for updates
state0 := m.node.State.Snapshot()
// TODO change limiting by buffer size -> to limiting by time
// (see updateNodeTab for details)
updateq := make(chan _ΔClusterState, 1024)
m.notifyTab[node.NID] = updateq
// XXX go not here - only after initial state is sent out
/*
m.notifyWG.Add(1)
go func() {
defer m.notifyWG.Done()
}()
*/
return node, accept
}
......
......@@ -200,11 +200,14 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
// master pushes whole nodeTab and partTab to us right after identification
// XXX temp hack
// if node.MyInfo.Type == proto.CLIENT {
// nodeTab
mnt := proto.NotifyNodeInformation{}
_, err = mlink.Expect1(&mnt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
return fmt.Errorf("after identification: expect nodeTab: %w", err)
}
// partTab (not to S and secondary M(?))
......@@ -214,7 +217,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
mpt := proto.SendPartitionTable{}
_, err = mlink.Expect1(&mpt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
return fmt.Errorf("after identification: expect partTab: %w", err)
}
pt = xneo.PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
log.Infof(ctx, "<- parttab:\n%s", pt)
......@@ -236,6 +239,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
if err != nil {
return err
}
// }
wg := xsync.NewWorkGroup(ctx)
// receive and handle notifications from master
......
......@@ -71,26 +71,12 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
// ----------------------------------------
// XXX move -> master.go
// event: node connects
type nodeCome struct {
req *neonet.Request
idReq *proto.RequestIdentification // we received this identification request
}
/*
// event: node disconnects
type nodeLeave struct {
node *neo.Node
}
*/
// reject sends rejective identification response and closes associated link
func reject(ctx context.Context, req *neonet.Request, resp proto.Msg) {
// XXX cancel on ctx?
// log.Info(ctx, "identification rejected") ?
err1 := req.Reply(resp)
// XXX req.Close() ?
err2 := req.Link().Close()
err := xerr.Merge(err1, err2)
if err != nil {
......@@ -108,9 +94,6 @@ func goreject(ctx context.Context, wg *sync.WaitGroup, req *neonet.Request, resp
// accept replies with acceptive identification response
// XXX spawn ping goroutine from here?
func accept(ctx context.Context, req *neonet.Request, resp proto.Msg) error {
// XXX cancel on ctx
err1 := req.Reply(resp)
return err1 // XXX while trying to work on single conn
//err2 := conn.Close()
//return xerr.First(err1, err2)
return req.Reply(resp)
// XXX req.Close() ?
}
......@@ -304,7 +304,16 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
// --- serve incoming connections from other nodes ---
// identify processes identification request from connected peer.
func (stor *Storage) identify(idReq *proto.RequestIdentification) (proto.Msg, bool) {
func (stor *Storage) identify(ctx context.Context, idReq *proto.RequestIdentification) (proto.Msg, bool) {
idResp, ok := stor.identify_(idReq)
if ok {
log.Info(ctx, "accepting identification")
} else {
log.Info(ctx, "rejecting identification (%s)", idResp.(*proto.Error).Message)
}
return idResp, ok
}
func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, bool) {
// XXX stub: we accept clients and don't care about their NID
if idReq.NodeType != proto.CLIENT {
return &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}, false
......@@ -338,7 +347,7 @@ func (stor *Storage) withWhileOperational(ctx context.Context) (context.Context,
opCtx := stor.opCtx
stor.opMu.Unlock()
return xcontext.Merge(ctx, opCtx)
return xcontext.Merge/*Cancel*/(ctx, opCtx)
}
......@@ -349,9 +358,10 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *
defer xio.CloseWhenDone(ctx, link)()
// first process identification
idResp, ok := stor.identify(idReq)
// XXX just req.Reply(idResp) + return if !ok
idResp, ok := stor.identify(ctx, idReq)
if !ok {
reject(ctx, req, idResp) // XXX log?
reject(ctx, req, idResp)
return nil
}
......@@ -361,7 +371,6 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *
}
// client passed identification, now serve other requests
log.Info(ctx, "identification accepted") // FIXME must be in identify?
// rederive ctx to be also cancelled if M tells us StopOperation
ctx, cancel := stor.withWhileOperational(ctx)
......
......@@ -446,6 +446,15 @@ func (t0 *tEnv) NewCluster_MS(name string, Sback storage.Backend) *tCluster {
YourNID: proto.NID(proto.STORAGE, 1),
}))
// M sends nodeTab to S
t.Expect("m-s", conntx("m:2", "s:2", 0, &proto.NotifyNodeInformation{
IdTime: proto.IdTimeNone, // XXX ?
NodeList: []proto.NodeInfo{
nodei("m:1", proto.MASTER, 1, proto.RUNNING, proto.IdTimeNone),
nodei("s:1", proto.STORAGE, 1, proto.PENDING, 0.01),
},
}))
// M starts recovery on S
t.Expect("m-s", conntx("m:2", "s:2", 0, &proto.Recovery{}))
t.Expect("m-s", conntx("s:2", "m:2", 0, &proto.AnswerRecovery{
......
......@@ -69,7 +69,7 @@ type NodeTable struct {
localNode *Node
nodev []*PeerNode // all nodes
notifyv []chan proto.NodeInfo // subscribers
// notifyv []chan proto.NodeInfo // subscribers
}
//trace:event traceNodeChanged(nt *NodeTable, n *PeerNode)
......@@ -139,7 +139,7 @@ func (nt *NodeTable) Update(nodeInfo proto.NodeInfo) *PeerNode {
traceNodeChanged(nt, node)
nt.notify(node.NodeInfo)
// nt.notify(node.NodeInfo) XXX kill
return node
}
......@@ -156,12 +156,14 @@ func (nt *NodeTable) StorageList() []*PeerNode {
}
/*
// XXX doc
func (n *PeerNode) SetState(state proto.NodeState) {
n.State = state
traceNodeChanged(n.nodeTab, n)
n.nodeTab.notify(n.NodeInfo)
}
*/
......@@ -177,6 +179,7 @@ func (nt *NodeTable) String() string {
return buf.String()
}
/*
// ---- subscription to nodetab updates ----
// XXX used only by M -> move into M?
......@@ -258,6 +261,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []proto.NodeInfo, unsubscribe
return ch, unsubscribe
}
*/
// ---- peer link ----
......@@ -301,7 +305,7 @@ func (p *PeerNode) ResetLink(ctx context.Context) {
p.linkMu.Lock()
link := p.link
p.link = nil
p.dialing = nil // XXX what if dialing is in progress?
p.dialing = nil // XXX what if dialing is in progress? -> cancel dialing with err?
p.linkMu.Unlock()
if link != nil {
......
......@@ -60,9 +60,6 @@ type Node struct {
// NodeTab *NodeTable // information about nodes in the cluster
// PartTab *PartitionTable // information about data distribution in the cluster
// ClusterState proto.ClusterState // master idea about cluster state
// // should be set by user so Node can notify when master tells this node to shutdown
// OnShutdown func()
}
// NewNode creates new node.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment