Commit edb2fec5 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 22a44b94
...@@ -22,7 +22,8 @@ package neo ...@@ -22,7 +22,8 @@ package neo
// Master organization // Master organization
// //
// Master is organized as follows: // Master is the node tha oversees how whole cluster works, ... XXX
// It is organized as follows:
// //
// - main task that controls whole logic of master working. It spawns // - main task that controls whole logic of master working. It spawns
// subtasks to implement that logic and communicate with the subtask via channels. XXX // subtasks to implement that logic and communicate with the subtask via channels. XXX
...@@ -46,7 +47,7 @@ import ( ...@@ -46,7 +47,7 @@ import (
"fmt" "fmt"
"time" "time"
xxcontext "lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xnet" "lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync" "lab.nexedi.com/kirr/go123/xsync"
...@@ -57,7 +58,7 @@ import ( ...@@ -57,7 +58,7 @@ import (
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
"lab.nexedi.com/kirr/neo/go/internal/log" "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task" "lab.nexedi.com/kirr/neo/go/internal/task"
"lab.nexedi.com/kirr/neo/go/internal/xcontext" xxcontext "lab.nexedi.com/kirr/neo/go/internal/xcontext"
"lab.nexedi.com/kirr/neo/go/internal/xio" "lab.nexedi.com/kirr/neo/go/internal/xio"
) )
...@@ -258,7 +259,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -258,7 +259,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
req, idReq, err := lli.Accept(ctx) req, idReq, err := lli.Accept(ctx)
if err != nil { if err != nil {
if !xcontext.Canceled(err) { if !xxcontext.Canceled(err) {
log.Error(ctx, err) // XXX throttle? log.Error(ctx, err) // XXX throttle?
} }
continue continue
...@@ -449,7 +450,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -449,7 +450,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
goStorCtlRecovery := func(stor *_MasteredPeer) { goStorCtlRecovery := func(stor *_MasteredPeer) {
inprogress++ inprogress++
stor.wg.Go(func(peerCtx context.Context) error { stor.wg.Go(func(peerCtx context.Context) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx) ctx, cancel := xcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel() defer cancel()
var pt *xneo.PartitionTable var pt *xneo.PartitionTable
...@@ -464,7 +465,7 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -464,7 +465,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
<-ack <-ack
// canceled recovery does not mean we should down the storage node // canceled recovery does not mean we should down the storage node
if xcontext.Canceled(err) { if xxcontext.Canceled(err) {
err = nil err = nil
} }
return err return err
...@@ -675,7 +676,7 @@ func (m *Master) verify(ctx context.Context) (err error) { ...@@ -675,7 +676,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
<-ack <-ack
// canceled verify does not mean we should down the storage node // canceled verify does not mean we should down the storage node
if xcontext.Canceled(err) { if xxcontext.Canceled(err) {
err = nil err = nil
} }
return err return err
...@@ -866,7 +867,7 @@ func (m *Master) serve(ctx context.Context) (err error) { ...@@ -866,7 +867,7 @@ func (m *Master) serve(ctx context.Context) (err error) {
goServe := func(peer *_MasteredPeer) { goServe := func(peer *_MasteredPeer) {
inprogress++ inprogress++
peer.wg.Go(func(peerCtx context.Context) error { peer.wg.Go(func(peerCtx context.Context) error {
ctx, cancel := xxcontext.Merge/*Cancel*/(ctx, peerCtx) ctx, cancel := xcontext.Merge/*Cancel*/(ctx, peerCtx)
defer cancel() defer cancel()
switch peer.node.Type { switch peer.node.Type {
...@@ -884,7 +885,7 @@ func (m *Master) serve(ctx context.Context) (err error) { ...@@ -884,7 +885,7 @@ func (m *Master) serve(ctx context.Context) (err error) {
<-ack <-ack
// canceled serve does not necessarily mean we should down the peer // canceled serve does not necessarily mean we should down the peer
if xcontext.Canceled(err) { if xxcontext.Canceled(err) {
err = nil err = nil
} }
return err return err
...@@ -1293,7 +1294,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) { ...@@ -1293,7 +1294,7 @@ func (p *_MasteredPeer) notify(ctx context.Context) (err error) {
stateCode := p.state0.Code stateCode := p.state0.Code
// XXX vvv right? // XXX vvv right?
return xcontext.WithCloseOnErrCancel(ctx, p.node.Link(), func() error { return xxcontext.WithCloseOnErrCancel(ctx, p.node.Link(), func() error {
for { for {
var δstate _ΔClusterState var δstate _ΔClusterState
......
...@@ -27,6 +27,10 @@ import ( ...@@ -27,6 +27,10 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/neo/go/neo/neonet" "lab.nexedi.com/kirr/neo/go/neo/neonet"
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
"lab.nexedi.com/kirr/neo/go/neo/storage" "lab.nexedi.com/kirr/neo/go/neo/storage"
...@@ -38,10 +42,6 @@ import ( ...@@ -38,10 +42,6 @@ import (
taskctx "lab.nexedi.com/kirr/neo/go/internal/xcontext/task" taskctx "lab.nexedi.com/kirr/neo/go/internal/xcontext/task"
"lab.nexedi.com/kirr/neo/go/internal/xio" "lab.nexedi.com/kirr/neo/go/internal/xio"
"lab.nexedi.com/kirr/neo/go/internal/xzodb" "lab.nexedi.com/kirr/neo/go/internal/xzodb"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
) )
// Storage is NEO node that keeps data and provides read/write access to it via network. // Storage is NEO node that keeps data and provides read/write access to it via network.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment