Commit a17bc034 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a0cf235e
......@@ -147,7 +147,7 @@ func (c *Client) recvMaster(ctx context.Context, mlink *_MasterLink) (err error)
defer task.Running(&ctx, "rx")(&err)
for {
req, err := mlink.Recv1()
req, err := mlink.Recv1(ctx)
if err != nil {
return err
}
......
......@@ -28,7 +28,7 @@ import (
"time"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
// "lab.nexedi.com/kirr/go123/xsync"
"lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
......@@ -51,15 +51,14 @@ import (
// δPartTab
// δClusterState
// ↑
// RecvM1 ---------------
// user <-------- | _MasteredNode | <- M
// _MasterLink.Recv1 ---------------
// user <------------------- | _MasteredNode | <- M
// ---------------
//
// This pipeline is operated by TalkMaster.
// The connection to master is persisted by redial as needed.
//
// XXX update after introduction of _MasterLink
// XXX use `nodem *_MasteredNode` XXX naming=?
type _MasteredNode struct {
*xneo.Node
......@@ -75,21 +74,11 @@ type _MasteredNode struct {
opReady chan struct{} // reinitialized each time state becomes non-operational
operational bool // cache for state.IsOperational()
rxm chan _RxM // TalkMaster -> RecvM1
// rxm chan _RxM // TalkMaster -> RecvM1
rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to RecvM1
// XXX just use `.myInfo.NodeType == STORAGE` instead?
}
// _MasterLink represents NodeLink to master with Recv1 filtered through ^^^ XXX
// XXX place
type _MasterLink struct {
*neonet.NodeLink
node *_MasteredNode
}
func (mlink *_MasterLink) Recv1() (neonet.Request, error) {
return mlink.node.recvM1()
}
// _RxM represents a request or event received from master.
type _RxM struct {
Req neonet.Request
......@@ -189,9 +178,6 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
// master pushes whole nodeTab and partTab to us right after identification
// XXX temp hack
// if node.MyInfo.Type == proto.CLIENT {
// nodeTab
mnt := proto.NotifyNodeInformation{}
_, err = mlink.Expect1(&mnt)
......@@ -213,7 +199,6 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
}
// update cluster state
node.updateOperational(func() {
err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown
node.State.PartTab = pt
......@@ -228,8 +213,8 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
if err != nil {
return err
}
// }
/*
wg := xsync.NewWorkGroup(ctx)
// receive and handle notifications from master
// XXX no need to spawn "rx prefilter" - just make MasterLink.Recv1() call mlink.Recv1(), check if message should be prefiltered and call Master.recvMaster1. (-> yes)
......@@ -248,16 +233,65 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
}
}
})
*/
// run user code
wg.Go(func(ctx context.Context) error {
// wg.Go(func(ctx context.Context) error {
return f(ctx, &_MasterLink{mlink, node})
})
// })
return wg.Wait()
// return wg.Wait()
})
}
// _MasterLink represents NodeLink to master with Recv1 filtered through _MasteredNode.
type _MasterLink struct {
*neonet.NodeLink
node *_MasteredNode
}
// RecvM1 receives request from master filtered through _MasteredNode δstate handler.
//
// Must be called only when master link is established - e.g. from under TalkMaster.
func (mlink *_MasterLink) Recv1(ctx context.Context) (neonet.Request, error) {
for {
req, err := mlink.NodeLink.Recv1() // cancel on ctx
if err != nil {
// close(node.rxm)
return neonet.Request{}, err
}
// messages for state changes are handled internally
δstate := true
switch req.Msg.(type) {
default: δstate = false
case *proto.SendPartitionTable: // whole partTab
case *proto.NotifyPartitionChanges: // δ(partTab)
case *proto.NotifyNodeInformation: // δ(nodeTab)
case *proto.NotifyClusterState:
}
if δstate {
δpt, err := mlink.node.recvδstate(ctx, req.Msg)
passthrough := false
if δpt && (mlink.node.rxmFlags & δPartTabPassThrough != 0) {
passthrough = true
}
if !passthrough {
req.Close()
}
if err != nil {
return neonet.Request{}, err
}
if !passthrough {
continue
}
}
return req, nil
}
}
/*
// recvMaster1 handles 1 message from master.
func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request) (err error) {
// messages for state changes are handled internally
......@@ -295,19 +329,20 @@ func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request)
return nil
}
*/
var errMasterDisconect = errors.New("master disconnected")
// recvM1 receives request from master filtered through δstate handler.
//var errMasterDisconect = errors.New("master disconnected")
//
// Must be called only when master link is established - e.g. from under TalkMaster.
func (node *_MasteredNode) recvM1() (neonet.Request, error) {
rx, ok := <-node.rxm
if !ok {
return neonet.Request{}, errMasterDisconect
}
return rx.Req, rx.Err
}
// // recvM1 receives request from master filtered through δstate handler.
// //
// // Must be called only when master link is established - e.g. from under TalkMaster.
// func (node *_MasteredNode) recvM1() (neonet.Request, error) {
// rx, ok := <-node.rxm
// if !ok {
// return neonet.Request{}, errMasterDisconect
// }
// return rx.Req, rx.Err
// }
//trace:event traceClusterStateChanged(cs *proto.ClusterState)
......@@ -325,7 +360,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
case *proto.SendPartitionTable:
δpt = true
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
log.Infof(ctx, "parttab update: %s", pt)
log.Infof(ctx, "<- parttab: %s", pt)
node.State.PartTab = pt
// <- δ(partTab)
......@@ -338,7 +373,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
err = node.updateNodeTab(ctx, msg) // XXX recheck return (might be command to shutdown)
case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State)
log.Infof(ctx, "<- state: %s", msg.State)
node.State.Code = msg.State
traceClusterStateChanged(&node.State.Code)
}
......
......@@ -131,7 +131,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *_MasterLink) (reqS
defer task.Runningf(&ctx, "mserve init")(&err)
for {
req, err := mlink.Recv1()
req, err := mlink.Recv1(ctx)
if err != nil {
return nil, err
}
......@@ -233,8 +233,7 @@ func (stor *Storage) m1serve(ctx context.Context, mlink *_MasterLink, reqStart *
}
for {
// XXX abort on ctx (XXX or upper?)
req, err := mlink.Recv1()
req, err := mlink.Recv1(ctx)
if err != nil {
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment