Commit 221be50a authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a317f201
...@@ -70,7 +70,7 @@ type _MasteredNode struct { ...@@ -70,7 +70,7 @@ type _MasteredNode struct {
operational bool // cache for Node.State.IsOperational() operational bool // cache for Node.State.IsOperational()
rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to mlink.Recv1 rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to mlink.Recv1
// XXX just use `.myInfo.NodeType == STORAGE` instead? // XXX just use `.node.Type == STORAGE` instead?
} }
type _MasteredNodeFlags int type _MasteredNodeFlags int
...@@ -104,13 +104,14 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -104,13 +104,14 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
// //
// f is called on every reconnection to master after identification and protocol prologue. // f is called on every reconnection to master after identification and protocol prologue.
// //
// TalkMaster should be the only mutator of cluster state and .MyInfo.NID. XXX -> updateState // TalkMaster should be the only mutator of node state.
// //
// See top-level _MasteredNode overview for details. // See top-level _MasteredNode overview for details.
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) { func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// start logging with initial NID (that might be temporary, and which master can tell us to change) // start logging with initial NID (that might be temporary, and which master can tell us to change)
ctx0 := ctx ctx0 := ctx
defer task.Runningf(&ctx, "%s: talk master(%s)", node.MyInfo.NID, node.MasterAddr)(&err) // XXX .State protect with snapshot? (but we are the only mutator - it is ok not to)
defer task.Runningf(&ctx, "%s: talk master(%s)", node.State.MyNID, node.MasterAddr)(&err)
for { for {
node.updateOperational(func() { node.updateOperational(func() {
...@@ -137,12 +138,13 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -137,12 +138,13 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
} }
func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error { func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error {
s := node.StateHead()
reqID := &proto.RequestIdentification{ reqID := &proto.RequestIdentification{
NodeType: node.MyInfo.Type, NodeType: node.Type,
NID: node.MyInfo.NID, NID: s.MyNID,
Address: node.MyInfo.Addr, Address: node.Addr,
ClusterName: node.ClusterName, ClusterName: node.ClusterName,
IdTime: node.MyInfo.IdTime, IdTime: s.IdTime,
DevPath: nil, // XXX stub DevPath: nil, // XXX stub
NewNID: nil, // XXX stub NewNID: nil, // XXX stub
} }
...@@ -152,9 +154,11 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -152,9 +154,11 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
} }
return xio.WithCloseOnRetCancel(ctx, mlink, func() (err error) { return xio.WithCloseOnRetCancel(ctx, mlink, func() (err error) {
if accept.YourNID != node.MyInfo.NID { if accept.YourNID != s.MyNID {
log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID) log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID)
node.MyInfo.NID = accept.YourNID // XXX .updateState node.UpdateState(func(s *xneo.State) {
s.MyNID = accept.YourNID
})
} }
// TODO verify Mnid = M*; our nid corresponds to our type // TODO verify Mnid = M*; our nid corresponds to our type
...@@ -176,7 +180,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -176,7 +180,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
// partTab (not to S and secondary M(?)) // partTab (not to S and secondary M(?))
// https://lab.nexedi.com/nexedi/neoppod/blob/v1.12-69-gd98205d0/neo/master/handlers/__init__.py#L60-67 // https://lab.nexedi.com/nexedi/neoppod/blob/v1.12-69-gd98205d0/neo/master/handlers/__init__.py#L60-67
pt := node.State.PartTab pt := node.State.PartTab
if !(node.MyInfo.Type == proto.STORAGE) { if !(node.Type == proto.STORAGE) {
mpt := proto.SendPartitionTable{} mpt := proto.SendPartitionTable{}
_, err = mlink.Expect1(&mpt) _, err = mlink.Expect1(&mpt)
if err != nil { if err != nil {
...@@ -187,8 +191,8 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -187,8 +191,8 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
} }
// update cluster state // update cluster state
node.updateOperational(func() { // XXX -> updateState node.updateState(func(s *xneo.State) {
err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown err = s.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown
node.State.PartTab = pt node.State.PartTab = pt
if err == nil { if err == nil {
// keep mlink=nil on shutdown so that // keep mlink=nil on shutdown so that
...@@ -379,20 +383,21 @@ var cmdShutdown = errors.New("master told us to shutdown") ...@@ -379,20 +383,21 @@ var cmdShutdown = errors.New("master told us to shutdown")
// updateNodeTab applies updates to .nodeTab from message and logs changes appropriately. // updateNodeTab applies updates to .nodeTab from message and logs changes appropriately.
// the only possible error is cmdShutdown. // the only possible error is cmdShutdown.
// must be called under .opMu. // must be called under .opMu.
func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) error { func (s *_MasteredState) updateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) error {
// XXX msg.IdTime ? // XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
log.Infof(ctx, "<- node: %v", nodeInfo) log.Infof(ctx, "<- node: %v", nodeInfo)
node.State.NodeTab.Update(nodeInfo) s.NodeTab.Update(nodeInfo)
// we have to provide IdTime when requesting identification to other peers // we have to provide IdTime when requesting identification to other peers
// (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master") // (e.g. Spy checks this is what master broadcast them and if not replies "unknown by master")
// TODO .State = DOWN -> ResetLink // TODO .State = DOWN -> ResetLink
if nodeInfo.NID == node.MyInfo.NID { if nodeInfo.NID == s.MyNID {
// XXX recheck locking -> under .updateState // TODO verify .Type is ours
node.MyInfo = nodeInfo // TODO verify .Addr is ours
// TODO .IdTime - do something with it?
// NEO/py currently employs this hack // NEO/py currently employs this hack
// FIXME -> better it be separate command and handled cleanly // FIXME -> better it be separate command and handled cleanly
...@@ -404,6 +409,6 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -404,6 +409,6 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
} }
} }
log.Infof(ctx, "full nodetab:\n%s", node.State.NodeTab) log.Infof(ctx, "full nodetab:\n%s", s.NodeTab)
return nil return nil
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment