Commit a7ad691c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5e7c4dff
...@@ -293,7 +293,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -293,7 +293,7 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
m.mainWG.Go(m.main) m.mainWG.Go(m.main)
err = m.mainWG.Wait() err = m.mainWG.Wait()
// change `... canceled` to just canceled? // change `... canceled` to just canceled
// (e.g. `master: listen: canceled` or `master: main: canceled` -> `master: canceled`) // (e.g. `master: listen: canceled` or `master: main: canceled` -> `master: canceled`)
if ctx.Err() != nil { if ctx.Err() != nil {
err = ctx.Err() err = ctx.Err()
......
...@@ -70,7 +70,6 @@ type _MasteredNode struct { ...@@ -70,7 +70,6 @@ type _MasteredNode struct {
operational bool // cache for Node.State.IsOperational() operational bool // cache for Node.State.IsOperational()
rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to mlink.Recv1 rxmFlags _MasteredNodeFlags // if e.g. δPartTab messages should be delivered to mlink.Recv1
// XXX just use `.node.Type == STORAGE` instead?
} }
type _MasteredNodeFlags int type _MasteredNodeFlags int
...@@ -130,7 +129,6 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -130,7 +129,6 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
// XXX 1s hardcoded -> move out of here
case <-time.After(1*time.Second): case <-time.After(1*time.Second):
// ok // ok
} }
......
...@@ -118,7 +118,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -118,7 +118,7 @@ func (stor *Storage) Run(ctx context.Context, l xnet.Listener) (err error) {
}) })
} }
// m1initialize drives storage by master messages during initialization phase // m1initialize drives storage by master messages during initialization phase.
// //
// Initialization includes master retrieving info for cluster recovery and data // Initialization includes master retrieving info for cluster recovery and data
// verification before starting operation. Initialization finishes either // verification before starting operation. Initialization finishes either
...@@ -151,7 +151,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *_MasterLink) (reqS ...@@ -151,7 +151,7 @@ func (stor *Storage) m1initialize(ctx context.Context, mlink *_MasterLink) (reqS
var cmdStart = errors.New("start requested") var cmdStart = errors.New("start requested")
// m1initialize1 handles one message from master from under m1initialize // m1initialize1 handles one message from master from under m1initialize.
func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) error { func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) error {
var err error var err error
...@@ -178,7 +178,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -178,7 +178,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
RowList: mpt.RowList}) RowList: mpt.RowList})
case *proto.LockedTransactions: case *proto.LockedTransactions:
// FIXME r/o stub // XXX r/o stub
err = req.Reply(&proto.AnswerLockedTransactions{}) err = req.Reply(&proto.AnswerLockedTransactions{})
// TODO AskUnfinishedTransactions // TODO AskUnfinishedTransactions
...@@ -187,7 +187,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro ...@@ -187,7 +187,7 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
lastTid, zerr1 := stor.back.LastTid(ctx) lastTid, zerr1 := stor.back.LastTid(ctx)
lastOid, zerr2 := stor.back.LastOid(ctx) lastOid, zerr2 := stor.back.LastOid(ctx)
if zerr := xerr.First(zerr1, zerr2); zerr != nil { if zerr := xerr.First(zerr1, zerr2); zerr != nil {
return zerr // TODO send the error to M ? return zerr // TODO send error to M
} }
err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid}) err = req.Reply(&proto.AnswerLastIDs{LastTid: lastTid, LastOid: lastOid})
...@@ -268,13 +268,14 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error { ...@@ -268,13 +268,14 @@ func (stor *Storage) m1serve1(ctx context.Context, req neonet.Request) error {
// --- serve incoming connections from other nodes --- // --- serve incoming connections from other nodes ---
// XXX doc
func (stor *Storage) serve(ctx context.Context) (err error) { func (stor *Storage) serve(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "serve")(&err) defer task.Runningf(&ctx, "serve")(&err)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
defer wg.Wait() defer wg.Wait()
// XXX ? -> _MasteredNode.Accept(lli) (it will verify IdTime against .nodeTab[nid]) // XXX ? -> _MasteredNode.Accept(lli) (it will verify IdTime against .nodeTab[nid] + ClusterName)
// XXX ? -> Node.Serve(lli -> func(idReq)) // XXX ? -> Node.Serve(lli -> func(idReq))
for { for {
if ctx.Err() != nil { if ctx.Err() != nil {
...@@ -295,7 +296,7 @@ func (stor *Storage) serve(ctx context.Context) (err error) { ...@@ -295,7 +296,7 @@ func (stor *Storage) serve(ctx context.Context) (err error) {
err := xio.WithCloseOnRetCancel(ctx, req.Link(), func() error { err := xio.WithCloseOnRetCancel(ctx, req.Link(), func() error {
return stor.serveLink(ctx, req, idReq) return stor.serveLink(ctx, req, idReq)
}) })
if err == nil { if err != nil {
if ctx.Err() == nil { if ctx.Err() == nil {
// the error is not due to serve cancel // the error is not due to serve cancel
log.Error(ctx, err) log.Error(ctx, err)
...@@ -327,7 +328,7 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, * ...@@ -327,7 +328,7 @@ func (stor *Storage) identify_(idReq *proto.RequestIdentification) (proto.Msg, *
return nil, &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"} return nil, &proto.Error{proto.PROTOCOL_ERROR, "only clients are accepted"}
} }
if idReq.ClusterName != stor.node.ClusterName { if idReq.ClusterName != stor.node.ClusterName {
return nil, &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"} return nil, &proto.Error{proto.PROTOCOL_ERROR, "cluster name mismatch"} // XXX -> MasteredNode.Accept
} }
s := stor.node.StateHead() // XXX ok instaed of stor.node.WithState? s := stor.node.StateHead() // XXX ok instaed of stor.node.WithState?
......
...@@ -28,7 +28,7 @@ import ( ...@@ -28,7 +28,7 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto" "lab.nexedi.com/kirr/neo/go/neo/proto"
) )
// PartitionTable represents object space partitioning in a cluster // PartitionTable represents object space partitioning in a cluster.
// //
// It is // It is
// //
......
...@@ -28,7 +28,7 @@ import ( ...@@ -28,7 +28,7 @@ import (
// State represents state of a cluster. // State represents state of a cluster.
type State struct { type State struct {
// XXX +transaction ID? // XXX +transaction ID? (= idtime ?)
NodeTab *NodeTable // nodes in the cluster NodeTab *NodeTable // nodes in the cluster
PartTab *PartitionTable // on which nodes which data is located PartTab *PartitionTable // on which nodes which data is located
...@@ -36,7 +36,7 @@ type State struct { ...@@ -36,7 +36,7 @@ type State struct {
} }
// StateRO provides read-only access to a State. // StateRO provides read-only access to a State.
// XXX -> StateSnapshot ? // XXX -> StateSnapshot ? XXX ROState ?
type StateRO interface { type StateRO interface {
NodeTab() NodeTableRO NodeTab() NodeTableRO
PartTab() PartitionTableRO PartTab() PartitionTableRO
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment