Commit 897bd5a7 authored by Kirill Smelkov's avatar Kirill Smelkov


parent 775fce3a
......@@ -77,9 +77,9 @@ type _MasteredNode struct {
opReady chan struct{} // reinitialized each time state becomes non-operational
operational bool // cache for state.IsOperational()
flags _MasteredNodeFlags
rxm chan _RxM // TalkMaster -> RecvM1
rxm chan _RxM // TalkMaster -> RecvM1
rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to RecvM1
// XXX just use `.myInfo.NodeType == STORAGE` instead?
// _RxM represents a request or event received from master.
......@@ -133,9 +133,9 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
// Notifications to node/partition tables and cluster state are automatically
// handled, while other notifications and requests are passed through to RecvM1.
// The connection to master is persisted by redial as needed.
// Master link is persisted by redialing as needed.
// f is called on every reconnection after identification and protocol prologue.
// f is called on every reconnection to master after identification and protocol prologue.
// See top-level _MasteredNode overview for details.
......@@ -156,6 +156,9 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
defer task.Runningf(&ctx, "%s: talk master(%s)", me0, node.MasterAddr)(&err)
for {
node.updateOperational(func() {
node.mlink = nil
err := node.talkMaster1(ctx, ctx0, f)
log.Warning(ctx, err) // XXX Warning ok? -> Error?
// TODO if err == "reject identification / protocol error" -> shutdown client
......@@ -221,24 +224,19 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
// update cluster state
// XXX locking
err = node.updateNodeTab(ctx, &mnt)
node.state.PartTab = pt
// XXX update "operational"
// update .operational + notify those who was waiting for it
opready := c.updateOperational()
if err != nil { // might be command to shutdown
node.updateOperational(func() {
err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown
node.state.PartTab = pt
if err != nil {
// keep mlink=nil on shutdown so that
// .operational does not change to y.
node.mlink = mlink
if err != nil {
return err
// XXX update .masterLink + notify waiters
wg := xsync.NewWorkGroup(ctx)
// receive and handle notifications from master
wg.Go(func(ctx context.Context) error {
......@@ -279,7 +277,7 @@ func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request)
if δstate {
δpt, err := node.recvδstate(ctx, req.Msg)
toRecvM1 := false
if δpt && (node.flags & δPartTabPassThrough != 0) {
if δpt && (node.rxmFlags & δPartTabPassThrough != 0) {
toRecvM1 = true
if !toRecvM1 {
......@@ -318,74 +316,70 @@ func (node *_MasteredNode) RecvM1() (neonet.Request, error) {
func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt bool, err error) {
δpt = false
// XXX defer unlock ?
switch msg := msg.(type) {
panic(fmt.Sprintf("unexpected message: %T", msg))
// <- whole partTab
case *proto.SendPartitionTable:
δpt = true
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt)
node.state.PartTab = pt
// <- δ(partTab)
case *proto.NotifyPartitionChanges:
δpt = true
panic("TODO δ(partTab)")
// <- δ(nodeTab)
case *proto.NotifyNodeInformation:
err = node.updateNodeTab(ctx, msg) // XXX recheck return (might be command to shutdown)
case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State)
node.state.Code = msg.State
// update .operational + notify those who was waiting for it
opready := node.updateOperational()
node.updateOperational(func() {
switch msg := msg.(type) {
panic(fmt.Sprintf("unexpected message: %T", msg))
// <- whole partTab
case *proto.SendPartitionTable:
δpt = true
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt)
node.state.PartTab = pt
// <- δ(partTab)
case *proto.NotifyPartitionChanges:
δpt = true
panic("TODO δ(partTab)")
// <- δ(nodeTab)
case *proto.NotifyNodeInformation:
err = node.updateNodeTab(ctx, msg) // XXX recheck return (might be command to shutdown)
case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State)
node.state.Code = msg.State
return δpt, err
// updateOperational updates .operational from current state.
// Must be called with .opMu lock held.
// Returned sendReady func must be called by updateOperational caller after
// .node.StateMu lock is released - it will close current .opReady this way
// notifying .operational waiters.
func (node *_MasteredNode) updateOperational() (sendReady func()) {
operational := node.state.IsOperational()
//fmt.Printf("\nupdateOperatinal: %v\n", operational)
// updateOperational calls δf under .opMu and updates .operational from current state.
// it also notifies those who was waiting for it if operational state becomes ready.
func (node *_MasteredNode) updateOperational(δf func()) {
var opready chan struct{}
if operational != node.operational {
node.operational = operational
if operational {
opready = node.opReady // don't close from under opMu
} else {
node.opReady = make(chan struct{}) // remake for next operational waiters
return func() {
if opready != nil {
//fmt.Println("updateOperational - notifying %v\n", opready)
func() {
defer node.opMu.Unlock()
operational := (node.mlink != nil) && node.state.IsOperational()
//fmt.Printf("\nupdateOperatinal: %v\n", operational)
if operational != node.operational {
node.operational = operational
if operational {
opready = node.opReady // don't close from under opMu
} else {
node.opReady = make(chan struct{}) // remake for next operational waiters
if opready != nil {
//fmt.Println("updateOperational - notifying %v\n", opready)
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment