Commit 1c6f7911 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a8818cc3
...@@ -115,7 +115,7 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -115,7 +115,7 @@ func (c *Client) Run(ctx context.Context) (err error) {
ctx, cancel := xcontext.Merge/*Cancel*/(ctx, runCtx) ctx, cancel := xcontext.Merge/*Cancel*/(ctx, runCtx)
defer cancel() defer cancel()
return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error { return c.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX errctx ("on redial"? "connected"?) // XXX errctx ("on redial"? "connected"?)
c.head0 = c.head c.head0 = c.head
...@@ -124,7 +124,7 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -124,7 +124,7 @@ func (c *Client) Run(ctx context.Context) (err error) {
// launch master notifications receiver // launch master notifications receiver
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
return c.recvMaster(ctx) return c.recvMaster(ctx, mlink)
}) })
// sync lastTid with master // sync lastTid with master
...@@ -142,11 +142,11 @@ func (c *Client) Run(ctx context.Context) (err error) { ...@@ -142,11 +142,11 @@ func (c *Client) Run(ctx context.Context) (err error) {
} }
// recvMaster receives and handles notifications from master. // recvMaster receives and handles notifications from master.
func (c *Client) recvMaster(ctx context.Context) (err error) { func (c *Client) recvMaster(ctx context.Context, mlink *_MasterLink) (err error) {
defer task.Running(&ctx, "rx")(&err) defer task.Running(&ctx, "rx")(&err)
for { for {
req, err := c.node.RecvM1() req, err := mlink.Recv1()
if err != nil { if err != nil {
return err return err
} }
...@@ -207,7 +207,7 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error { ...@@ -207,7 +207,7 @@ func (c *Client) invalidateObjects(msg *proto.InvalidateObjects) error {
} }
// syncMaster asks M for DB head right after identification. // syncMaster asks M for DB head right after identification.
func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) { func (c *Client) syncMaster(ctx context.Context, mlink *_MasterLink) (err error) {
defer task.Running(&ctx, "sync0")(&err) // XXX unify with Sync ? defer task.Running(&ctx, "sync0")(&err) // XXX unify with Sync ?
// query last_tid // query last_tid
......
...@@ -58,6 +58,8 @@ import ( ...@@ -58,6 +58,8 @@ import (
// //
// This pipeline is operated by TalkMaster. // This pipeline is operated by TalkMaster.
// The connection to master is persisted by redial as needed. // The connection to master is persisted by redial as needed.
//
// XXX update after introduction of _MasterLink
type _MasteredNode struct { type _MasteredNode struct {
myInfo proto.NodeInfo // type, laddr, nid, state, idtime myInfo proto.NodeInfo // type, laddr, nid, state, idtime
ClusterName string ClusterName string
...@@ -81,6 +83,16 @@ type _MasteredNode struct { ...@@ -81,6 +83,16 @@ type _MasteredNode struct {
// XXX just use `.myInfo.NodeType == STORAGE` instead? // XXX just use `.myInfo.NodeType == STORAGE` instead?
} }
// _MasterLink represents NodeLink to master with Recv1 filtered through ^^^ XXX
// XXX place
type _MasterLink struct {
*neonet.NodeLink
node *_MasteredNode
}
func (mlink *_MasterLink) Recv1() (neonet.Request, error) {
return mlink.node.recvM1()
}
// _RxM represents a request or event received from master. // _RxM represents a request or event received from master.
type _RxM struct { type _RxM struct {
Req neonet.Request Req neonet.Request
...@@ -137,7 +149,7 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -137,7 +149,7 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
// See top-level _MasteredNode overview for details. // See top-level _MasteredNode overview for details.
// //
// XXX -> FollowMaster? AdhereMaster? // XXX -> FollowMaster? AdhereMaster?
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *neonet.NodeLink) error) (err error) { func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// me0 describes local node when it starts connecting to master, e.g. 'client C?'. // me0 describes local node when it starts connecting to master, e.g. 'client C?'.
// we don't use just NID because it is initially 0 and because master can tell us to change it. // we don't use just NID because it is initially 0 and because master can tell us to change it.
me0 := strings.ToLower(node.myInfo.Type.String()) me0 := strings.ToLower(node.myInfo.Type.String())
...@@ -176,7 +188,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -176,7 +188,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
} }
} }
func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *neonet.NodeLink) error) error { func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error {
reqID := &proto.RequestIdentification{ reqID := &proto.RequestIdentification{
NodeType: node.myInfo.Type, NodeType: node.myInfo.Type,
NID: node.myInfo.NID, NID: node.myInfo.NID,
...@@ -259,7 +271,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -259,7 +271,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
}) })
// run user code // run user code
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
return f(ctx, mlink) return f(ctx, &_MasterLink{mlink, node})
}) })
return wg.Wait() return wg.Wait()
...@@ -307,10 +319,10 @@ func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request) ...@@ -307,10 +319,10 @@ func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request)
var errMasterDisconect = errors.New("master disconnected") var errMasterDisconect = errors.New("master disconnected")
// RecvM1 receives request from master filtered through δstate handler. // recvM1 receives request from master filtered through δstate handler.
// //
// Must be called only when master link is established - e.g. from under TalkMaster. // Must be called only when master link is established - e.g. from under TalkMaster.
func (node *_MasteredNode) RecvM1() (neonet.Request, error) { func (node *_MasteredNode) recvM1() (neonet.Request, error) {
rx, ok := <-node.rxm rx, ok := <-node.rxm
if !ok { if !ok {
return neonet.Request{}, errMasterDisconect return neonet.Request{}, errMasterDisconect
......
...@@ -100,7 +100,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -100,7 +100,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// connect to master and get commands and updates from it // connect to master and get commands and updates from it
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
return stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error { return stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX move -> SetNumReplicas handler // XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1` // // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) { // if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
...@@ -114,7 +114,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -114,7 +114,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
} }
// we got StartOperation command. Let master drive us during service phase. // we got StartOperation command. Let master drive us during service phase.
return stor.m1serve(ctx, reqStart) return stor.m1serve(ctx, mlink, reqStart)
}) })
}) })
...@@ -172,11 +172,11 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -172,11 +172,11 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// return error indicates: // return error indicates:
// - nil: initialization was ok and a command came from master to start operation. // - nil: initialization was ok and a command came from master to start operation.
// - !nil: initialization was cancelled or failed somehow. // - !nil: initialization was cancelled or failed somehow.
func (stor *Storage) m1initialize(ctx context.Context, mlink *neonet.NodeLink) (reqStart *neonet.Request, err error) { func (stor *Storage) m1initialize(ctx context.Context, mlink *_MasterLink) (reqStart *neonet.Request, err error) {
defer task.Runningf(&ctx, "init %v", mlink)(&err) defer task.Runningf(&ctx, "init %s", mlink)(&err)
for { for {
req, err := stor.node.RecvM1() req, err := mlink.Recv1()
if err != nil { if err != nil {
return nil, err return nil, err
} }
...@@ -253,9 +253,8 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -253,9 +253,8 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
// it always returns with an error describing why serve had to be stopped - // it always returns with an error describing why serve had to be stopped -
// either due to master commanding us to stop, or context cancel or some other // either due to master commanding us to stop, or context cancel or some other
// error. // error.
func (stor *Storage) m1serve(ctx context.Context, reqStart *neonet.Request) (err error) { func (stor *Storage) m1serve(ctx context.Context, mlink *_MasterLink, reqStart *neonet.Request) (err error) {
mlink := reqStart.Link() defer task.Runningf(&ctx, "serve %s", mlink)(&err)
defer task.Runningf(&ctx, "serve %v", mlink)(&err)
// refresh stor.opCtx and cancel it when we finish so that client // refresh stor.opCtx and cancel it when we finish so that client
// handlers know they need to stop operating as master told us to do so. // handlers know they need to stop operating as master told us to do so.
...@@ -275,7 +274,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neonet.Request) (err ...@@ -275,7 +274,7 @@ func (stor *Storage) m1serve(ctx context.Context, reqStart *neonet.Request) (err
for { for {
// XXX abort on ctx (XXX or upper?) // XXX abort on ctx (XXX or upper?)
req, err := stor.node.RecvM1() req, err := mlink.Recv1()
if err != nil { if err != nil {
return err return err
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment