Commit 7087d5f4 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent de6e50fc
...@@ -49,160 +49,42 @@ import ( ...@@ -49,160 +49,42 @@ import (
) )
/* // M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
func TestMasterStorage0(t0 *testing.T) { func TestMasterStorage(t0 *testing.T) {
t := NewTestCluster(t0, "abc1") t := NewTestCluster(t0, "abc1")
defer t.Stop() defer t.Stop()
M := t.NewMaster("m") M := t.NewMaster("m")
//zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs") zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
S := t.NewStorage("s", "m:1", zback) // XXX do we need to provide Mlist here? S := t.NewStorage("s", "m:1", zback) // XXX do we need to provide Mlist here?
C := t.NewClient("c") C := t.NewClient("c", "m:1")
// start nodes XXX move starting to TestCluster?
gwg, gctx := errgroup.WithContext(bg)
//defer xwait(gwg) XXX not yet correctly stopped on context cancel
gwg.Go(func() error {
return M.Run(gctx)
})
gwg.Go(func() error {
return S.Run(gctx)
})
gwg.Go(func() error {
return C.run(gctx)
})
tM := t.Checker("m") tM := t.Checker("m")
tS := t.Checker("s") tS := t.Checker("s")
tC := t.Checker("c")
tMS := t.Checker("m-s") tMS := t.Checker("m-s")
tSM := t.Checker("s-m") tSM := t.Checker("s-m")
tCM := t.Checker("c-m")
// M starts listening tMC := t.Checker("m-c")
tM.Expect(netlisten("m:1")) tCS := t.Checker("c-s")
tM.Expect(δnode("m", "m:1", proto.MASTER, 1, proto.RUNNING, proto.IdTimeNone))
tM.Expect(clusterState("m", proto.ClusterRecovering))
// TODO create C; C tries connect to master - rejected ("not yet operational")
// S starts listening
tS.Expect(netlisten("s:1"))
// S connects M
tSM.Expect(netconnect("s:2", "m:2", "m:1"))
tSM.Expect(conntx("s:2", "m:2", 1, &proto.RequestIdentification{
NodeType: proto.STORAGE,
UUID: 0,
Address: xnaddr("s:1"),
ClusterName: "abc1",
IdTime: proto.IdTimeNone,
}))
tM.Expect(δnode("m", "s:1", proto.STORAGE, 1, proto.PENDING, 0.01))
tSM.Expect(conntx("m:2", "s:2", 1, &proto.AcceptIdentification{
NodeType: proto.MASTER,
MyUUID: proto.UUID(proto.MASTER, 1),
NumPartitions: 1,
NumReplicas: 0,
YourUUID: proto.UUID(proto.STORAGE, 1),
}))
// TODO test ID rejects (uuid already registered, ...)
// M starts recovery on S
tMS.Expect(conntx("m:2", "s:2", 0, &proto.Recovery{}))
tMS.Expect(conntx("s:2", "m:2", 0, &proto.AnswerRecovery{
// empty new node
PTid: 0,
BackupTid: proto.INVALID_TID,
TruncateTid: proto.INVALID_TID,
}))
tMS.Expect(conntx("m:2", "s:2", 2, &proto.AskPartitionTable{}))
tMS.Expect(conntx("s:2", "m:2", 2, &proto.AnswerPartitionTable{
PTid: 0,
RowList: []proto.RowInfo{},
}))
// M ready to start: new cluster, no in-progress S recovery
tM.Expect(masterStartReady("m", true))
}
*/
// M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
func TestMasterStorage(t *testing.T) {
rt := NewEventRouter()
dispatch := tracetest.NewEventDispatcher(rt)
tracer := NewTraceCollector(dispatch)
net := pipenet.New("testnet") // test network
tracer.Attach()
defer tracer.Detach()
// XXX -> M = testenv.NewMaster("m") (mkhost, chan, register to tracer ...)
// XXX ----//---- S, C
Mhost := xnet.NetTrace(net.Host("m"), tracer)
Shost := xnet.NetTrace(net.Host("s"), tracer)
Chost := xnet.NetTrace(net.Host("c"), tracer)
cM := tracetest.NewSyncChan("m.main") // trace of events local to M
cS := tracetest.NewSyncChan("s.main") // trace of events local to S XXX with cause root also on S
cC := tracetest.NewSyncChan("c.main")
cMS := tracetest.NewSyncChan("m-s") // trace of events with cause root being m -> s send
cSM := tracetest.NewSyncChan("s-m") // trace of events with cause root being s -> m send
cMC := tracetest.NewSyncChan("m-c") // ----//---- m -> c
cCM := tracetest.NewSyncChan("c-m") // ----//---- c -> m
cCS := tracetest.NewSyncChan("c-s") // ----//---- c -> s
tM := tracetest.NewEventChecker(t, dispatch, cM)
tS := tracetest.NewEventChecker(t, dispatch, cS)
tC := tracetest.NewEventChecker(t, dispatch, cC)
tMS := tracetest.NewEventChecker(t, dispatch, cMS)
tSM := tracetest.NewEventChecker(t, dispatch, cSM)
tMC := tracetest.NewEventChecker(t, dispatch, cMC)
tCM := tracetest.NewEventChecker(t, dispatch, cCM)
tCS := tracetest.NewEventChecker(t, dispatch, cCS)
rt.BranchNode("m", cM)
rt.BranchNode("s", cS)
rt.BranchLink("s-m", cSM, cMS)
rt.BranchLink("c-m", cCM, cMC)
rt.BranchLink("c-s", cCS, rt.defaultq /* S never pushes to C */)
// rt.BranchState("s", cMS) // state on S is controlled by M
// rt.BranchState("c", cMC) // state on C is controlled by M
rt.BranchNode("c", cC)
// cluster nodes
M := tNewMaster("abc1", ":1", Mhost)
zstor := xfs1stor("../zodb/storage/fs1/testdata/1.fs")
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
S := tNewStorage("abc1", "m:1", ":1", Shost, zback)
C := newClient("abc1", "m:1", Chost)
// let tracer know how to map state addresses to node names
tracer.RegisterNode(M.node, "m") // XXX better Mhost.Name() ?
tracer.RegisterNode(S.node, "s")
tracer.RegisterNode(C.node, "c")
gwg := &errgroup.Group{}
// ---------------------------------------- // ----------------------------------------
// start master
Mclock := &vclock{}
M.monotime = Mclock.monotime
Mctx, Mcancel := context.WithCancel(bg)
gox(gwg, func() {
err := M.Run(Mctx)
fmt.Println("M err: ", err)
exc.Raiseif(err)
})
// start storage
Sctx, Scancel := context.WithCancel(bg)
gox(gwg, func() {
err := S.Run(Sctx)
fmt.Println("S err: ", err)
exc.Raiseif(err)
})
// trace
// M starts listening // M starts listening
tM.Expect(netlisten("m:1")) tM.Expect(netlisten("m:1"))
tM.Expect(δnode("m", "m:1", proto.MASTER, 1, proto.RUNNING, proto.IdTimeNone)) tM.Expect(δnode("m", "m:1", proto.MASTER, 1, proto.RUNNING, proto.IdTimeNone))
...@@ -213,7 +95,9 @@ func TestMasterStorage(t *testing.T) { ...@@ -213,7 +95,9 @@ func TestMasterStorage(t *testing.T) {
// S starts listening // S starts listening
tS.Expect(netlisten("s:1")) tS.Expect(netlisten("s:1"))
// S connects M // S connects M
tSM.Expect(netdial("s", "m:1"))
tSM.Expect(netconnect("s:2", "m:2", "m:1")) tSM.Expect(netconnect("s:2", "m:2", "m:1"))
tSM.Expect(conntx("s:2", "m:2", 1, &proto.RequestIdentification{ tSM.Expect(conntx("s:2", "m:2", 1, &proto.RequestIdentification{
NodeType: proto.STORAGE, NodeType: proto.STORAGE,
...@@ -223,6 +107,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -223,6 +107,7 @@ func TestMasterStorage(t *testing.T) {
IdTime: proto.IdTimeNone, IdTime: proto.IdTimeNone,
})) }))
tM.Expect(δnode("m", "s:1", proto.STORAGE, 1, proto.PENDING, 0.01)) tM.Expect(δnode("m", "s:1", proto.STORAGE, 1, proto.PENDING, 0.01))
tSM.Expect(conntx("m:2", "s:2", 1, &proto.AcceptIdentification{ tSM.Expect(conntx("m:2", "s:2", 1, &proto.AcceptIdentification{
...@@ -253,7 +138,6 @@ func TestMasterStorage(t *testing.T) { ...@@ -253,7 +138,6 @@ func TestMasterStorage(t *testing.T) {
// M ready to start: new cluster, no in-progress S recovery // M ready to start: new cluster, no in-progress S recovery
tM.Expect(masterStartReady("m", true)) tM.Expect(masterStartReady("m", true))
// ---------------------------------------- // ----------------------------------------
// M <- start cmd // M <- start cmd
...@@ -312,22 +196,12 @@ func TestMasterStorage(t *testing.T) { ...@@ -312,22 +196,12 @@ func TestMasterStorage(t *testing.T) {
// TODO S join while service // TODO S join while service
// TODO M.Stop while service // TODO M.Stop while service
// ---------------------------------------- // ----------------------------------------
// XXX try starting client from the beginning // trace of client start
// start client
Cctx, Ccancel := context.WithCancel(bg)
gox(gwg, func() {
err := C.run(Cctx)
fmt.Println("C err: ", err)
exc.Raiseif(err)
})
// trace
// C connects M // C connects M
tCM.Expect(netdial("c", "m:1"))
tCM.Expect(netconnect("c:1", "m:3", "m:1")) tCM.Expect(netconnect("c:1", "m:3", "m:1"))
tCM.Expect(conntx("c:1", "m:3", 1, &proto.RequestIdentification{ tCM.Expect(conntx("c:1", "m:3", 1, &proto.RequestIdentification{
NodeType: proto.CLIENT, NodeType: proto.CLIENT,
...@@ -372,7 +246,6 @@ func TestMasterStorage(t *testing.T) { ...@@ -372,7 +246,6 @@ func TestMasterStorage(t *testing.T) {
tC.Expect(δnode("c", "s:1", proto.STORAGE, 1, proto.RUNNING, 0.01)) tC.Expect(δnode("c", "s:1", proto.STORAGE, 1, proto.RUNNING, 0.01))
tC.Expect(δnode("c", "", proto.CLIENT, 1, proto.RUNNING, 0.02)) tC.Expect(δnode("c", "", proto.CLIENT, 1, proto.RUNNING, 0.02))
// ---------------------------------------- // ----------------------------------------
// C asks M about last tid XXX better master sends it itself on new client connected // C asks M about last tid XXX better master sends it itself on new client connected
...@@ -414,6 +287,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -414,6 +287,7 @@ func TestMasterStorage(t *testing.T) {
// trace // trace
// ... -> connects to S // ... -> connects to S
tCS.Expect(netdial("c", "s:1"))
tCS.Expect(netconnect("c:2", "s:3", "s:1")) tCS.Expect(netconnect("c:2", "s:3", "s:1"))
tCS.Expect(conntx("c:2", "s:3", 1, &proto.RequestIdentification{ tCS.Expect(conntx("c:2", "s:3", 1, &proto.RequestIdentification{
NodeType: proto.CLIENT, NodeType: proto.CLIENT,
...@@ -450,7 +324,6 @@ func TestMasterStorage(t *testing.T) { ...@@ -450,7 +324,6 @@ func TestMasterStorage(t *testing.T) {
xwait(wg) xwait(wg)
// ---------------------------------------- // ----------------------------------------
// verify NextSerial is properly returned in AnswerObject via trace-loading prev. revision of obj1 // verify NextSerial is properly returned in AnswerObject via trace-loading prev. revision of obj1
...@@ -494,7 +367,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -494,7 +367,7 @@ func TestMasterStorage(t *testing.T) {
// XXX hack: disable tracing early so that C.Load() calls do not deadlock // XXX hack: disable tracing early so that C.Load() calls do not deadlock
// TODO refactor cluster creation into func // TODO refactor cluster creation into func
// TODO move client all loading tests into separate test where tracing will be off // TODO move client all loading tests into separate test where tracing will be off
tracer.Detach() t.gotracer.Detach()
for { for {
_, dataIter, err := ziter.NextTxn(bg) _, dataIter, err := ziter.NextTxn(bg)
...@@ -542,9 +415,6 @@ func TestMasterStorage(t *testing.T) { ...@@ -542,9 +415,6 @@ func TestMasterStorage(t *testing.T) {
} }
// TODO S.Stop() or Scancel() // TODO S.Stop() or Scancel()
// expect: // expect:
// M.nodeTab -= S // M.nodeTab -= S
...@@ -557,12 +427,6 @@ func TestMasterStorage(t *testing.T) { ...@@ -557,12 +427,6 @@ func TestMasterStorage(t *testing.T) {
// (M needs to resend to all storages recovery messages just from start) // (M needs to resend to all storages recovery messages just from start)
time.Sleep(100*time.Millisecond) // XXX temp so net tx'ers could actually tx time.Sleep(100*time.Millisecond) // XXX temp so net tx'ers could actually tx
return
Mcancel() // FIXME ctx cancel not fully handled
Scancel() // ---- // ----
Ccancel() // ---- // ----
xwait(gwg)
} }
......
...@@ -32,6 +32,7 @@ import ( ...@@ -32,6 +32,7 @@ import (
"lab.nexedi.com/kirr/neo/go/xcommon/xtracing/tracetest" "lab.nexedi.com/kirr/neo/go/xcommon/xtracing/tracetest"
"lab.nexedi.com/kirr/neo/go/neo/storage" "lab.nexedi.com/kirr/neo/go/neo/storage"
"lab.nexedi.com/kirr/neo/go/zodb"
) )
...@@ -50,7 +51,7 @@ type TestCluster struct { ...@@ -50,7 +51,7 @@ type TestCluster struct {
nodeTab map[string/*node*/]*tNode nodeTab map[string/*node*/]*tNode
checkTab map[string/*node*/]*tracetest.EventChecker checkTab map[string/*node*/]*tracetest.EventChecker
ttest testing.TB // original testing env this cluster was created at testing.TB // original testing env this cluster was created at
} }
// tNode represents information about a test node ... XXX // tNode represents information about a test node ... XXX
...@@ -59,9 +60,20 @@ type tNode struct { ...@@ -59,9 +60,20 @@ type tNode struct {
} }
// XXX stub // XXX stub
type ITestMaster interface {} type ITestMaster interface {
type ITestStorage interface {} Run(ctx context.Context) error
type ITestClient interface {} Start() error
}
type ITestStorage interface {
Run(ctx context.Context) error
}
type ITestClient interface {
run(ctx context.Context) error
zodb.IStorageDriver
}
// NewTestCluster creates new NEO test cluster. // NewTestCluster creates new NEO test cluster.
// //
...@@ -77,7 +89,7 @@ func NewTestCluster(ttest testing.TB, name string) *TestCluster { ...@@ -77,7 +89,7 @@ func NewTestCluster(ttest testing.TB, name string) *TestCluster {
checkTab: make(map[string]*tracetest.EventChecker), checkTab: make(map[string]*tracetest.EventChecker),
//... XXX //... XXX
ttest: ttest, TB: ttest,
} }
t.erouter = NewEventRouter() t.erouter = NewEventRouter()
...@@ -134,7 +146,7 @@ func (t *TestCluster) registerNewNode(name string) *tNode { ...@@ -134,7 +146,7 @@ func (t *TestCluster) registerNewNode(name string) *tNode {
// tracechecker for events on node // tracechecker for events on node
c1 := tracetest.NewSyncChan(name) // trace of events local to node c1 := tracetest.NewSyncChan(name) // trace of events local to node
t.erouter.BranchNode(name, c1) t.erouter.BranchNode(name, c1)
t.checkTab[name] = tracetest.NewEventChecker(t.ttest, t.edispatch, c1) t.checkTab[name] = tracetest.NewEventChecker(t.TB, t.edispatch, c1)
// tracecheckers for events on links of all node1-node2 pairs // tracecheckers for events on links of all node1-node2 pairs
for name2 := range t.nodeTab { for name2 := range t.nodeTab {
...@@ -143,8 +155,8 @@ func (t *TestCluster) registerNewNode(name string) *tNode { ...@@ -143,8 +155,8 @@ func (t *TestCluster) registerNewNode(name string) *tNode {
// ----//---- node2 -> node1 send // ----//---- node2 -> node1 send
c21 := tracetest.NewSyncChan(name2 + "-" + name) c21 := tracetest.NewSyncChan(name2 + "-" + name)
t12 := tracetest.NewEventChecker(t.ttest, t.edispatch, c12) t12 := tracetest.NewEventChecker(t.TB, t.edispatch, c12)
t21 := tracetest.NewEventChecker(t.ttest, t.edispatch, c21) t21 := tracetest.NewEventChecker(t.TB, t.edispatch, c21)
t.erouter.BranchLink(name + "-" + name2, c12, c21) t.erouter.BranchLink(name + "-" + name2, c12, c21)
t.checkTab[name + "-" + name2] = t12 t.checkTab[name + "-" + name2] = t12
...@@ -167,22 +179,33 @@ func (t *TestCluster) registerNewNode(name string) *tNode { ...@@ -167,22 +179,33 @@ func (t *TestCluster) registerNewNode(name string) *tNode {
// XXX error of creating py process? // XXX error of creating py process?
func (t *TestCluster) NewMaster(name string) ITestMaster { func (t *TestCluster) NewMaster(name string) ITestMaster {
node := t.registerNewNode(name) node := t.registerNewNode(name)
return tNewMaster(t.name, ":1", node.net) m := tNewMaster(t.name, ":1", node.net)
// let tracer know how to map state addresses to node names
t.gotracer.RegisterNode(m.node, name)
return m
} }
func (t *TestCluster) NewStorage(name, masterAddr string, back storage.Backend) ITestStorage { func (t *TestCluster) NewStorage(name, masterAddr string, back storage.Backend) ITestStorage {
node := t.registerNewNode(name) node := t.registerNewNode(name)
return tNewStorage(t.name, masterAddr, ":1", node.net, back) s := tNewStorage(t.name, masterAddr, ":1", node.net, back)
t.gotracer.RegisterNode(s.node, name)
return s
} }
func (t *TestCluster) NewClient(name, masterAddr string) ITestClient { func (t *TestCluster) NewClient(name, masterAddr string) ITestClient {
node := t.registerNewNode(name) node := t.registerNewNode(name)
return newClient(t.name, masterAddr, node.net) c := newClient(t.name, masterAddr, node.net)
t.gotracer.RegisterNode(c.node, name)
return c
} }
// test-wrapper around Storage - to automatically listen by address, not provided listener. // tStorage is test-wrapper around Storage.
//
// - to automatically listen by address, not provided listener.
type tStorage struct { type tStorage struct {
*Storage *Storage
serveAddr string serveAddr string
...@@ -204,7 +227,7 @@ func (s *tStorage) Run(ctx context.Context) error { ...@@ -204,7 +227,7 @@ func (s *tStorage) Run(ctx context.Context) error {
return s.Storage.Run(ctx, l) return s.Storage.Run(ctx, l)
} }
// test-wrapper around Master // tMaster is test-wrapper around Master.
// //
// - automatically listens by address, not provided listener. // - automatically listens by address, not provided listener.
// - uses virtual clock. // - uses virtual clock.
......
...@@ -35,6 +35,12 @@ import ( ...@@ -35,6 +35,12 @@ import (
// NOTE to ease testing we use strings only to reprsent addresses or where // NOTE to ease testing we use strings only to reprsent addresses or where
// event happenned - not e.g. net.Addr or *NodeTab. // event happenned - not e.g. net.Addr or *NodeTab.
// xnet.TraceDial
// event: network dial starts
type eventNetDial struct {
Dialer, Addr string
}
// xnet.TraceConnect // xnet.TraceConnect
// event: network connection was made // event: network connection was made
type eventNetConnect struct { type eventNetConnect struct {
...@@ -92,6 +98,10 @@ func masterStartReady(where string, ready bool) *eventMStartReady { ...@@ -92,6 +98,10 @@ func masterStartReady(where string, ready bool) *eventMStartReady {
// ---- shortcuts ---- // ---- shortcuts ----
func netdial(dialer, addr string) *eventNetDial {
return &eventNetDial{Dialer: dialer, Addr: addr}
}
// shortcut for net connect event // shortcut for net connect event
func netconnect(src, dst, dialed string) *eventNetConnect { func netconnect(src, dst, dialed string) *eventNetConnect {
return &eventNetConnect{Src: src, Dst: dst, Dialed: dialed} return &eventNetConnect{Src: src, Dst: dst, Dialed: dialed}
...@@ -229,10 +239,20 @@ func (r *EventRouter) Route(event interface{}) (dst *tracetest.SyncChan) { ...@@ -229,10 +239,20 @@ func (r *EventRouter) Route(event interface{}) (dst *tracetest.SyncChan) {
defer r.mu.Unlock() defer r.mu.Unlock()
switch ev := event.(type) { switch ev := event.(type) {
default:
panic(fmt.Sprintf("event router: unexpected event %T", ev))
// networking // networking
case *eventNetListen: case *eventNetListen:
dst = r.byNode[host(ev.Laddr)] dst = r.byNode[host(ev.Laddr)]
case *eventNetDial:
link := ev.Dialer + "-" + host(ev.Addr)
ldst := r.byLink[link]
if ldst != nil {
dst = ldst.a
}
case *eventNetConnect: case *eventNetConnect:
link := host(ev.Src) + "-" + host(ev.Dst) link := host(ev.Src) + "-" + host(ev.Dst)
ldst := r.byLink[link] ldst := r.byLink[link]
......
...@@ -86,6 +86,12 @@ func (t *TraceCollector) RegisterNode(node *NodeApp, name string) { ...@@ -86,6 +86,12 @@ func (t *TraceCollector) RegisterNode(node *NodeApp, name string) {
t.clusterState2Owner[&node.ClusterState] = name t.clusterState2Owner[&node.ClusterState] = name
} }
func (t *TraceCollector) TraceNetDial(ev *xnet.TraceDial) {
t.d.Dispatch(&eventNetDial{
Dialer: ev.Dialer,
Addr: ev.Addr,
})
}
func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) { func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) {
t.d.Dispatch(&eventNetConnect{ t.d.Dispatch(&eventNetConnect{
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment