Commit e699c83b authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 42bff99e
...@@ -136,18 +136,6 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -136,18 +136,6 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
defer wg.Done() defer wg.Done()
stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged stor.serveLink(ctx, req, idReq) // XXX ignore err? -> logged
}() }()
// // handover to main driver
// select {
// case stor.nodeCome <- nodeCome{req, idReq}:
// // ok
//
// case <-ctx.Done():
// // shutdown
// xio.LClose(ctx, req.Link())
// continue
// }
} }
// }(serveCtx) // }(serveCtx)
}(ctx) }(ctx)
...@@ -158,6 +146,12 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -158,6 +146,12 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
// XXX log err? // XXX log err?
err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error { err = stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *neonet.NodeLink) error {
// XXX move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
// return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
// }
// let master initialize us. If successful this ends with StartOperation command. // let master initialize us. If successful this ends with StartOperation command.
reqStart, err := stor.m1initialize(ctx, mlink) reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil { if err != nil {
...@@ -183,93 +177,6 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -183,93 +177,6 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
return err return err
} }
// --- connect to master and let it drive us ---
// XXX move -> SetNumReplicas handler
/*
// NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
}
*/
/* XXX kill
// talkMaster connects to master, announces self and receives commands and notifications.
// it tries to persist master link reconnecting as needed.
//
// it always returns an error - either due to cancel or command from master to shutdown.
func (stor *Storage) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "talk master(%v)", stor.node.MasterAddr)(&err)
// XXX dup wrt Client.talkMaster
for {
err := stor.talkMaster1(ctx)
log.Warning(ctx, err) // XXX Warning ok? -> Error?
// TODO if err = shutdown -> return
// exit on cancel / throttle reconnecting
select {
case <-ctx.Done():
return ctx.Err()
// XXX 1s hardcoded -> move out of here
case <-time.After(1*time.Second):
// ok
}
}
}
// talkMaster1 does 1 cycle of connect/talk/disconnect to master.
//
// it returns error describing why such cycle had to finish.
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
// FIXME dup in MasteredNode.talkMaster1
node := stor.node
reqID := &proto.RequestIdentification{
NodeType: node.MyInfo.Type,
NID: node.MyInfo.NID,
Address: node.MyInfo.Addr,
ClusterName: node.ClusterName,
IdTime: node.MyInfo.IdTime, // XXX ok?
DevPath: nil, // XXX stub
NewNID: nil, // XXX stub
}
mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID)
if err != nil {
return err
}
defer xio.CloseWhenDone(ctx, mlink)()
// XXX add master NID -> nodeTab ? or master will notify us with it himself ?
// XXX -> node.Dial ?
if accept.YourNID != stor.node.MyInfo.NID {
log.Infof(ctx, "master told us to have nid=%v", accept.YourNID)
stor.node.MyInfo.NID = accept.YourNID
}
// XXX the first packet M sends always is NotifyNodeInformation (with us)
// -> receive it first via Expect1
// handle notifications and commands from master
// let master initialize us. If successful this ends with StartOperation command.
reqStart, err := stor.m1initialize(ctx, mlink)
if err != nil {
//log.Error(ctx, err)
return err
}
// we got StartOperation command. Let master drive us during service phase.
err = stor.m1serve(ctx, reqStart)
//log.Error(ctx, err)
return err
}
*/
// m1initialize drives storage by master messages during initialization phase // m1initialize drives storage by master messages during initialization phase
// //
// Initialization includes master retrieving info for cluster recovery and data // Initialization includes master retrieving info for cluster recovery and data
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment