Commit a317f201 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 92be102d
...@@ -75,7 +75,8 @@ func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage ...@@ -75,7 +75,8 @@ func NewStorage(clusterName, masterAddr string, net xnet.Networker, back storage
// The storage will be serving incoming connections on l. // The storage will be serving incoming connections on l.
func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
addr := l.Addr() addr := l.Addr()
log.Infof(ctx, "%s: listening on %s ...", stor.node.MyInfo.NID, addr) // XXX WithState?
log.Infof(ctx, "%s: listening on %s ...", stor.node.State.MyNID, addr)
stor.runCtx = ctx stor.runCtx = ctx
defer func() { defer func() {
...@@ -88,7 +89,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -88,7 +89,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
if err != nil { if err != nil {
return fmt.Errorf("run @%s: %s", addr, err) return fmt.Errorf("run @%s: %s", addr, err)
} }
stor.node.MyInfo.Addr = naddr stor.node.LAddr = naddr
// wrap listener with link / identification hello checker // wrap listener with link / identification hello checker
stor.lli = xneo.NewListener(neonet.NewLinkListener(l)) stor.lli = xneo.NewListener(neonet.NewLinkListener(l))
...@@ -213,7 +214,10 @@ func (stor *Storage) m1serve(ctx context.Context, mlink *_MasterLink, reqStart * ...@@ -213,7 +214,10 @@ func (stor *Storage) m1serve(ctx context.Context, mlink *_MasterLink, reqStart *
defer task.Runningf(&ctx, "mserve")(&err) defer task.Runningf(&ctx, "mserve")(&err)
// serve clients while operational // serve clients while operational
serveCtx := taskctx.Runningf(stor.runCtx, "%s", stor.node.MyInfo.NID) var serveCtx context.Context
stor.node.WithState(func(s *xneo.State) {
serveCtx = taskctx.Runningf(stor.runCtx, "%s", s.MyNID)
})
serveCtx, serveCancel := xcontext.Merge/*Cancel*/(serveCtx, ctx) serveCtx, serveCancel := xcontext.Merge/*Cancel*/(serveCtx, ctx)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(1) wg.Add(1)
...@@ -325,9 +329,10 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, * ...@@ -325,9 +329,10 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
return nil, &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"} return nil, &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"}
} }
s := stor.node.StateHead() // XXX ok instaed of stor.node.WithState?
return &proto.AcceptIdentification{ return &proto.AcceptIdentification{
NodeType: stor.node.MyInfo.Type, NodeType: stor.node.Type,
MyNID: stor.node.MyInfo.NID, // XXX lock wrt update MyNID: s.MyNID,
YourNID: idReq.NID, YourNID: idReq.NID,
}, nil }, nil
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment