Commit 6baa7257 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5954b705
...@@ -78,6 +78,11 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -78,6 +78,11 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
log.Infof(ctx, "%s: listening on %s ...", stor.node.MyInfo.NID, addr) log.Infof(ctx, "%s: listening on %s ...", stor.node.MyInfo.NID, addr)
stor.runCtx = ctx stor.runCtx = ctx
defer func() {
__ := stor.back.Close()
err = xerr.First(err, __)
}()
// update our serving address in node // update our serving address in node
naddr, err := proto.Addr(addr) naddr, err := proto.Addr(addr)
if err != nil { if err != nil {
...@@ -85,21 +90,16 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -85,21 +90,16 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
} }
stor.node.MyInfo.Addr = naddr stor.node.MyInfo.Addr = naddr
// wrap listener with link / identificaton hello checker // wrap listener with link / identification hello checker
stor.lli = xneo.NewListener(neonet.NewLinkListener(l)) stor.lli = xneo.NewListener(neonet.NewLinkListener(l))
defer func() { defer func() {
__ := stor.lli.Close() __ := stor.lli.Close()
err = xerr.First(err, __) err = xerr.First(err, __)
}() }()
defer func() {
__ := stor.back.Close()
err = xerr.First(err, __)
}()
// connect to master and let it drive us via commands and updates // connect to master and let it drive us via commands and updates
return stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error { return stor.node.TalkMaster(ctx, func(ctx context.Context, mlink *_MasterLink) error {
// XXX move -> SetNumReplicas handler // TODO move -> SetNumReplicas handler
// // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1` // // NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
// if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) { // if !(accept.NumPartitions == 1 && accept.NumReplicas == 0) {
// return fmt.Errorf("TODO for 1-storage POC: Npt: %d Nreplica: %d", accept.NumPartitions, accept.NumReplicas) // return fmt.Errorf("TODO for 1-storage POC: Npt: %d Nreplica: %d", accept.NumPartitions, accept.NumReplicas)
...@@ -175,7 +175,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -175,7 +175,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
RowList: stor.node.State.PartTab.Dump()}) RowList: stor.node.State.PartTab.Dump()})
case *proto.LockedTransactions: case *proto.LockedTransactions:
// XXX r/o stub // FIXME r/o stub
err = req.Reply(&proto.AnswerLockedTransactions{}) err = req.Reply(&proto.AnswerLockedTransactions{})
// TODO AskUnfinishedTransactions // TODO AskUnfinishedTransactions
...@@ -363,7 +363,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq * ...@@ -363,7 +363,7 @@ func (stor *Storage) serveLink(ctx context.Context, req *neonet.Request, idReq *
// TODO -> do what go-fuse does: // TODO -> do what go-fuse does:
// - serve request in the goroutine that received it (reduces latency) // - serve request in the goroutine that received it (reduces latency)
// - spawn another goroutine to continue accept loop // - spawn another goroutine to continue accept loop
// - limit number of such accept-loop goroutines by GOMAXPROC // - limit number of such accept-loop goroutines by GOMAXPROCS
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
return stor.serveClient(ctx, req) return stor.serveClient(ctx, req)
}) })
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment