diff --git a/go/neo/server/cluster_test.go b/go/neo/server/cluster_test.go index afe826df4958209040388f672d0211154560171d..1912c70c5778f84db6d9bc8fba90baa86d2c2570 100644 --- a/go/neo/server/cluster_test.go +++ b/go/neo/server/cluster_test.go @@ -100,13 +100,25 @@ func clusterState(cs *neo.ClusterState, v neo.ClusterState) *traceClusterState { // nodetab entry changed type traceNode struct { - NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff output + NodeTab unsafe.Pointer // *neo.NodeTable XXX not to noise test diff NodeInfo neo.NodeInfo } func (t *MyTracer) traceNode(nt *neo.NodeTable, n *neo.Node) { t.Trace1(&traceNode{unsafe.Pointer(nt), n.NodeInfo}) } +// master ready to start changed +type traceMStartReady struct { + Master unsafe.Pointer // *Master XXX not to noise test diff + Ready bool +} +func (t *MyTracer) traceMasterStartReady(m *Master, ready bool) { + t.Trace1(masterStartReady(m, ready)) +} +func masterStartReady(m *Master, ready bool) *traceMStartReady { + return &traceMStartReady{unsafe.Pointer(m), ready} +} + // vclock is a virtual clock // XXX place -> util? @@ -144,6 +156,7 @@ func TestMasterStorage(t *testing.T) { neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre) neo_traceClusterStateChanged_Attach(pg, tracer.traceClusterState) neo_traceNodeChanged_Attach(pg, tracer.traceNode) + traceMasterStartReady_Attach(pg, tracer.traceMasterStartReady) tracing.Unlock() @@ -159,11 +172,11 @@ func TestMasterStorage(t *testing.T) { return a } - // shortcut for net tx event - // XXX -> NetTx ? - nettx := func(src, dst, pkt string) *xnet.TraceTx { - return &xnet.TraceTx{Src: xaddr(src), Dst: xaddr(dst), Pkt: []byte(pkt)} - } + // // shortcut for net tx event + // // XXX -> NetTx ? + // nettx := func(src, dst, pkt string) *xnet.TraceTx { + // return &xnet.TraceTx{Src: xaddr(src), Dst: xaddr(dst), Pkt: []byte(pkt)} + // } // shortcut for net connect event // XXX -> NetConnect ? @@ -211,13 +224,13 @@ func TestMasterStorage(t *testing.T) { _ = err // XXX }) - - // expect: + // M starts listening tc.Expect(netlisten("m:1")) - tc.Expect(node(M.nodeTab, "m:1", neo.MASTER, 1, neo.RUNNING, 0.0)) tc.Expect(clusterState(&M.clusterState, neo.ClusterRecovering)) + // TODO create C; C tries connect to master - rejected ("not yet operational") + // start storage zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") S := NewStorage("abc1", "m:1", ":1", Shost, zstor) @@ -228,16 +241,11 @@ func TestMasterStorage(t *testing.T) { _ = err // XXX }) - // expect: + // S starts listening tc.Expect(netlisten("s:1")) - tc.Expect(netconnect("s:2", "m:2", "m:1")) - - //tc.ExpectPar( - // nettx("s:1", "m:1", "\x00\x00\x00\x01"), // handshake - // nettx("m:1", "s:1", "\x00\x00\x00\x01"), - //) - _ = nettx + // S connects M + tc.Expect(netconnect("s:2", "m:2", "m:1")) tc.Expect(conntx("s:2", "m:2", 1, &neo.RequestIdentification{ NodeType: neo.STORAGE, NodeUUID: 0, @@ -256,7 +264,7 @@ func TestMasterStorage(t *testing.T) { YourNodeUUID: neo.UUID(neo.STORAGE, 1), })) - // TODO test ID rejects + // TODO test ID rejects (uuid already registered, ...) // M starts recovery on S tc.Expect(conntx("m:2", "s:2", 1, &neo.Recovery{})) @@ -273,6 +281,14 @@ func TestMasterStorage(t *testing.T) { RowList: []neo.RowInfo{}, })) + // M ready to start: new cluster, no in-progress S recovery + tc.Expect(masterStartReady(M, true)) + + + // XXX M.partTab = 酶 + // XXX M can start -> writes parttab to S and goes to verification + + // XXX M.partTab <- ... // XXX updated something cluster currently can be operational diff --git a/go/neo/server/master.go b/go/neo/server/master.go index 12a37056303cf1d71e82aec3eaaa8ca29b209061..d67019deccbf923722986b0689a8d63b2cca302d 100644 --- a/go/neo/server/master.go +++ b/go/neo/server/master.go @@ -66,7 +66,6 @@ type Master struct { nodeCome chan nodeCome // node connected XXX -> acceptq? nodeLeave chan nodeLeave // node disconnected XXX -> don't need - // so tests could override monotime func() float64 } @@ -268,6 +267,8 @@ func (m *Master) runMain(ctx context.Context) (err error) { // - retrieve and recover latest previously saved partition table from storages // - monitor whether partition table becomes operational wrt currently up nodeset // - if yes - finish recovering upon receiving "start" command XXX or autostart +// - start is also allowed if storages connected and say there is no partition +// table saved to them (empty new cluster case). // storRecovery is result of 1 storage node passing recovery phase type storRecovery struct { @@ -290,14 +291,18 @@ func (m *Master) recovery(ctx context.Context) (err error) { ctx, rcancel := context.WithCancel(ctx) defer rcancel() +//trace:event traceMasterStartReady(m *Master, ready bool) readyToStart := false + recovery := make(chan storRecovery) + inprogress := 0 // in-progress stor recoveries wg := sync.WaitGroup{} // start recovery on all storages we are currently in touch with // XXX close links to clients for _, stor := range m.nodeTab.StorageList() { if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? + inprogress++ wg.Add(1) go func() { defer wg.Done() @@ -314,16 +319,21 @@ loop: node, resp := m.identify(ctx, n, /* XXX only accept storages -> PENDING */) // XXX set node.State = PENDING + if node == nil { + wg.Add(1) + go func() { + defer wg.Done() + m.reject(ctx, n.conn, resp) + }() + return + } + // if new storage arrived - start recovery on it too + inprogress++ wg.Add(1) go func() { defer wg.Done() - if node == nil { - m.reject(ctx, n.conn, resp) - return - } - err := m.accept(ctx, n.conn, resp) if err != nil { // XXX move this m.nodeLeave <- to accept() ? @@ -338,6 +348,8 @@ loop: // a storage node came through recovery - let's see whether // ptid 鈫� and if so we should take partition table from there case r := <-recovery: + inprogress-- + if r.err != nil { log.Error(ctx, r.err) @@ -359,10 +371,27 @@ loop: } // update indicator whether cluster currently can be operational or not - readyToStart = m.partTab.OperationalWith(m.nodeTab) // XXX + node state + var ready bool + if m.partTab.PTid == 0 { + // new cluster - allow startup if we have some storages passed + // recovery and there is no in-progress recovery running + nok := 0 + for _, stor := range m.nodeTab.StorageList() { + if stor.NodeState > neo.DOWN { + nok++ + } + } + ready = (nok > 0 && inprogress == 0) + + } else { + ready = m.partTab.OperationalWith(m.nodeTab) // XXX + node state + } - // XXX handle case of new cluster - when no storage reports valid parttab - // XXX -> create new parttab + if readyToStart != ready { + readyToStart = ready + traceMasterStartReady(m, ready) + } + // XXX -> create new parttab for new-cluster case // request to start the cluster - if ok we exit replying ok diff --git a/go/neo/server/ztrace.go b/go/neo/server/ztrace.go new file mode 100644 index 0000000000000000000000000000000000000000..7fd52c2784db13b567b41a5d80d369832a1ec643 --- /dev/null +++ b/go/neo/server/ztrace.go @@ -0,0 +1,39 @@ +// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT. + +package server +// code generated for tracepoints + +import ( + "lab.nexedi.com/kirr/neo/go/xcommon/tracing" + "unsafe" +) + +// traceevent: traceMasterStartReady(m *Master, ready bool) + +type _t_traceMasterStartReady struct { + tracing.Probe + probefunc func(m *Master, ready bool) +} + +var _traceMasterStartReady *_t_traceMasterStartReady + +func traceMasterStartReady(m *Master, ready bool) { + if _traceMasterStartReady != nil { + _traceMasterStartReady_run(m, ready) + } +} + +func _traceMasterStartReady_run(m *Master, ready bool) { + for p := _traceMasterStartReady; p != nil; p = (*_t_traceMasterStartReady)(unsafe.Pointer(p.Next())) { + p.probefunc(m, ready) + } +} + +func traceMasterStartReady_Attach(pg *tracing.ProbeGroup, probe func(m *Master, ready bool)) *tracing.Probe { + p := _t_traceMasterStartReady{probefunc: probe} + tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceMasterStartReady)), &p.Probe) + return &p.Probe +} + +// trace export signature +func _trace_exporthash_1002eef247af7731924a09f42e9c3f4131f5bccb() {}