Commit cab8b455 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5a3f6a12
......@@ -37,7 +37,7 @@ import (
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
"lab.nexedi.com/kirr/go123/exc"
"lab.nexedi.com/kirr/go123/tracing"
// "lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/pipenet"
"lab.nexedi.com/kirr/go123/xsync"
......@@ -47,7 +47,10 @@ import (
// M drives cluster with 1 S & C through recovery -> verification -> service -> shutdown
func TestMasterStorage(t0 *testing.T) {
func TestMasterStorage(t *testing.T) {
tracetest.Verify(t, tracetestMasterStorage)
}
func tracetestMasterStorage(t0 *tracetest.T) {
X := exc.Raiseif
t := tNewCluster(t0, "abc1")
......@@ -436,96 +439,98 @@ func (d tdispatch1) Dispatch(event interface{}) {
}
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
// create test cluster <- XXX factor to utility func
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
ctx, cancel := context.WithCancel(context.Background())
wg := xsync.NewWorkGroup(ctx)
defer wg.Wait()
defer cancel()
// spawn M
M := tNewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
cG := tracetest.NewChan("main")
tG := tracetest.NewEventChecker(b, nil /* XXX */, cG)
tracer := NewTraceCollector(tdispatch1{cG})
tracer.RegisterNode(M.node, "m")
tracing.Lock()
pnode := traceNodeChanged_Attach(nil, tracer.traceNode)
traceMasterStartReady_Attach(tracer.pg, tracer.traceMasterStartReady)
tracing.Unlock()
wg.Go(func(ctx context.Context) error {
return M.Run(ctx)
})
// determining M serving address XXX better with M api
ev := cG.Recv()
mnode, ok := ev.Event.(*eventNodeTab)
if !ok {
b.Fatalf("after M start: got %T ; want eventNodeTab", ev.Event)
}
Maddr := mnode.NodeInfo.Addr.String()
tracing.Lock()
pnode.Detach()
tracing.Unlock()
ev.Ack()
// now after we know Maddr create S & C and start S serving
S := tNewStorage("abc1", Maddr, "", Snet, zback)
wg.Go(func(ctx context.Context) error {
return S.Run(ctx)
})
C := NewClient("abc1", Maddr, Cnet)
wg.Go(func(ctx context.Context) error {
return C.Run(ctx)
})
// command M to start
tG.Expect(masterStartReady("m", true)) // <- XXX better with M api
tracer.Detach()
err := M.Start()
if err != nil {
b.Fatal(err)
}
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
obj1, err := zback.Load(ctx, xid1)
if err != nil {
b.Fatal(err)
}
buf1, serial1 := obj1.Data, obj1.Serial
// C.Load(xid1)
xcload1 := func() {
cbuf1, cserial1, err := C.Load(ctx, xid1)
if err != nil {
b.Fatal(err)
}
if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
b.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
}
cbuf1.Release()
}
// do first C.Load - this also implicitly waits for M & S to come up
// and C to connect to M and S.
xcload1()
// now start the benchmark
b.ResetTimer()
benchit(xcload1)
return
// XXX restore
// // create test cluster <- XXX factor to utility func
// zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
//
// ctx, cancel := context.WithCancel(context.Background())
// wg := xsync.NewWorkGroup(ctx)
// defer wg.Wait()
// defer cancel()
//
// // spawn M
// M := tNewMaster("abc1", "", Mnet)
//
// // XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
// cG := tracetest.NewChan("main")
// tG := tracetest.NewEventChecker(b, nil /* XXX */, cG)
//
// tracer := NewTraceCollector(tdispatch1{cG})
// tracer.RegisterNode(M.node, "m")
//
// tracing.Lock()
// pnode := traceNodeChanged_Attach(nil, tracer.traceNode)
// traceMasterStartReady_Attach(tracer.pg, tracer.traceMasterStartReady)
// tracing.Unlock()
//
// wg.Go(func(ctx context.Context) error {
// return M.Run(ctx)
// })
//
// // determining M serving address XXX better with M api
// ev := cG.Recv()
// mnode, ok := ev.Event.(*eventNodeTab)
// if !ok {
// b.Fatalf("after M start: got %T ; want eventNodeTab", ev.Event)
// }
// Maddr := mnode.NodeInfo.Addr.String()
//
// tracing.Lock()
// pnode.Detach()
// tracing.Unlock()
// ev.Ack()
//
// // now after we know Maddr create S & C and start S serving
// S := tNewStorage("abc1", Maddr, "", Snet, zback)
// wg.Go(func(ctx context.Context) error {
// return S.Run(ctx)
// })
//
// C := NewClient("abc1", Maddr, Cnet)
// wg.Go(func(ctx context.Context) error {
// return C.Run(ctx)
// })
//
// // command M to start
// tG.Expect(masterStartReady("m", true)) // <- XXX better with M api
// tracer.Detach()
//
// err := M.Start()
// if err != nil {
// b.Fatal(err)
// }
//
// xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
//
// obj1, err := zback.Load(ctx, xid1)
// if err != nil {
// b.Fatal(err)
// }
// buf1, serial1 := obj1.Data, obj1.Serial
//
// // C.Load(xid1)
// xcload1 := func() {
// cbuf1, cserial1, err := C.Load(ctx, xid1)
// if err != nil {
// b.Fatal(err)
// }
//
// if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
// b.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
// }
//
// cbuf1.Release()
// }
//
// // do first C.Load - this also implicitly waits for M & S to come up
// // and C to connect to M and S.
// xcload1()
//
// // now start the benchmark
// b.ResetTimer()
//
// benchit(xcload1)
}
func benchmarkGetObjectSerial(b *testing.B, Mnet, Snet, Cnet xnet.Networker) {
......
......@@ -24,7 +24,7 @@ import (
"context"
"fmt"
"sync"
"testing"
// "testing"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xnet/pipenet"
......@@ -39,7 +39,7 @@ import (
// tCluster is a test NEO cluster NEO.
// Create it with tNewCluster.
type tCluster struct {
testing.TB // original testing env this cluster was created at
*tracetest.T // original testing env this cluster was created at
name string // name of the cluster
network *pipenet.Network // nodes interoperate via netowrk XXX -> lo
......@@ -48,18 +48,18 @@ type tCluster struct {
//tpy *PyTracer // for tracing py nodes
erouter *EventRouter
edispatch *tracetest.EventDispatcher
// edispatch *tracetest.EventDispatcher
tabMu sync.Mutex
nodeTab map[string/*node*/]*tNode
// checkTab keeps event checkers for the following event streams:
// 1) events specific to a node, and
// 2) events specific to exchange in between node1-node2 for
// communications that originate at node1.
//
// NOTE that node1-node2 and node2-node1 are different and come with
// separate event checkers.
checkTab map[string/*node, or node1->node2*/]*tracetest.EventChecker
// // checkTab keeps event checkers for the following event streams:
// // 1) events specific to a node, and
// // 2) events specific to exchange in between node1-node2 for
// // communications that originate at node1.
// //
// // NOTE that node1-node2 and node2-node1 are different and come with
// // separate event checkers.
// checkTab map[string/*node, or node1->node2*/]*tracetest.EventChecker
}
// tNode represents information about a test node ... XXX
......@@ -88,21 +88,22 @@ type ITestClient interface {
// XXX ...
//
// XXX defer t.Stop()
func tNewCluster(ttest testing.TB, name string) *tCluster {
func tNewCluster(ttest *tracetest.T, name string) *tCluster {
t := &tCluster{
name: name,
network: pipenet.New("testnet"), // test network
nodeTab: make(map[string]*tNode),
checkTab: make(map[string]*tracetest.EventChecker),
// checkTab: make(map[string]*tracetest.EventChecker),
//... XXX
TB: ttest,
T: ttest,
}
t.erouter = NewEventRouter()
t.edispatch = tracetest.NewEventDispatcher(t.erouter)
t.gotracer = NewTraceCollector(t.edispatch)
// t.edispatch = tracetest.NewEventDispatcher(t.erouter)
t.gotracer = NewTraceCollector(ttest)
ttest.SetEventRouter(t.erouter.RouteEvent)
t.gotracer.Attach()
return t
......@@ -125,6 +126,21 @@ func (t *tCluster) Stop() error {
//
// name might be of "node" or "node1-node2" kind. XXX more text
// node or node1/node2 must be already registered.
//
// XXX is this a good idea?
// XXX move into tracetest.T? (e.g. T.Stream())
func (t *tCluster) Checker(name string) *tStreamChecker {
return &tStreamChecker{t: t, stream: name}
}
type tStreamChecker struct {
t *tCluster
stream string
}
func (c *tStreamChecker) Expect(event interface{}) {
c.t.Helper()
c.t.Expect(c.stream, event)
}
/*
func (t *tCluster) Checker(name string) *tracetest.EventChecker {
t.tabMu.Lock()
defer t.tabMu.Unlock()
......@@ -136,6 +152,7 @@ func (t *tCluster) Checker(name string) *tracetest.EventChecker {
return c
}
*/
// registerNewNode registers new node with given name.
//
......@@ -152,23 +169,23 @@ func (t *tCluster) registerNewNode(name string) *tNode {
}
// tracechecker for events on node
c1 := tracetest.NewChan(name) // trace of events local to node
t.erouter.BranchNode(name, c1)
t.checkTab[name] = tracetest.NewEventChecker(t.TB, t.edispatch, c1)
// c1 := tracetest.NewChan(name) // trace of events local to node
t.erouter.BranchNode(name, name)
// t.checkTab[name] = tracetest.NewEventChecker(t.TB, t.edispatch, c1)
// tracecheckers for events on links of all node1-node2 pairs
for name2 := range t.nodeTab {
// trace of events with cause root being node1 -> node2 send
c12 := tracetest.NewChan(name + "-" + name2)
// c12 := tracetest.NewChan(name + "-" + name2)
// ----//---- node2 -> node1 send
c21 := tracetest.NewChan(name2 + "-" + name)
t12 := tracetest.NewEventChecker(t.TB, t.edispatch, c12)
t21 := tracetest.NewEventChecker(t.TB, t.edispatch, c21)
// c21 := tracetest.NewChan(name2 + "-" + name)
//
// t12 := tracetest.NewEventChecker(t.TB, t.edispatch, c12)
// t21 := tracetest.NewEventChecker(t.TB, t.edispatch, c21)
t.erouter.BranchLink(name + "-" + name2, c12, c21)
t.checkTab[name + "-" + name2] = t12
t.checkTab[name2 + "-" + name] = t21
t.erouter.BranchLink(name + "-" + name2, name+"-"+name2, name2+"-"+name)
// t.checkTab[name + "-" + name2] = t12
// t.checkTab[name2 + "-" + name] = t21
}
node := &tNode{}
......
......@@ -25,7 +25,7 @@ import (
"net"
"sync"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
// "lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
......@@ -157,12 +157,12 @@ func δnode(where string, laddr string, typ proto.NodeType, num int32, state pro
type EventRouter struct {
mu sync.Mutex
// XXX all unbranched events go to here?
defaultq tracetest.Chan
// // XXX all unbranched events go to here?
// defaultq tracetest.Chan
// events specific to particular node - e.g. node starts listening,
// state on that node changes, etc...
byNode map[string /*host*/]tracetest.Chan
byNode map[string /*host*/]string /*stream*/
// state on host changes. Takes precendece over byNode.
//
......@@ -172,7 +172,7 @@ type EventRouter struct {
// byNode("C") and simply verify events on tMC and then tC in that order.
// keeping events local to C on tC, not tMC helps TestCluster to
// organize trace channels in uniform way )
byState map[string /*host*/]tracetest.Chan
byState map[string /*host*/]string /*stream*/
// event on a-b link
byLink map[string /*host-host*/]*linkDst
......@@ -187,21 +187,22 @@ type EventRouter struct {
// Events go to either a or b depending on which side initiated particular
// connection on top of the link.
type linkDst struct {
a tracetest.Chan // net cause was on dialer
b tracetest.Chan // net cause was on listener
a /*stream*/string // net cause was on dialer
b /*stream*/string // net cause was on listener
}
func NewEventRouter() *EventRouter {
return &EventRouter{
defaultq: tracetest.NewChan("default"),
byNode: make(map[string]tracetest.Chan),
byState: make(map[string]tracetest.Chan),
// defaultq: tracetest.NewChan("default"),
byNode: make(map[string]string),
byState: make(map[string]string),
byLink: make(map[string]*linkDst),
connected: make(map[string]bool),
}
}
/*
func (r *EventRouter) AllStreams() []tracetest.Chan {
rtset := map[tracetest.Chan]int{}
rtset[r.defaultq] = 1
......@@ -222,6 +223,7 @@ func (r *EventRouter) AllStreams() []tracetest.Chan {
}
return rtv
}
*/
// hostport splits addr of for "host:port" into host and port.
//
......@@ -245,7 +247,8 @@ func host(addr string) string {
}
// Route routes events according to rules specified via Branch*().
func (r *EventRouter) RouteEvent(event interface{}) (dst tracetest.Chan) {
//func (r *EventRouter) RouteEvent(event interface{}) (dst tracetest.Chan) {
func (r *EventRouter) RouteEvent(event interface{}) (dst/*stream*/ string) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -322,17 +325,14 @@ func (r *EventRouter) RouteEvent(event interface{}) (dst tracetest.Chan) {
dst = r.routeState(ev.Where)
}
if dst == nil {
dst = r.defaultq
}
return dst
}
// routeState routes event corresponding to state change on host
func (r *EventRouter) routeState(host string) (dst tracetest.Chan) {
func (r *EventRouter) routeState(host string) (dst string) {
// lookup dst by state rules
dst = r.byState[host]
if dst != nil {
if dst != "" {
return dst
}
......@@ -341,7 +341,7 @@ func (r *EventRouter) routeState(host string) (dst tracetest.Chan) {
}
// BranchNode branches events corresponding to host.
func (r *EventRouter) BranchNode(host string, dst tracetest.Chan) {
func (r *EventRouter) BranchNode(host string, dst string) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -355,7 +355,7 @@ func (r *EventRouter) BranchNode(host string, dst tracetest.Chan) {
// BranchState branches events corresponding to state changes on host.
//
// XXX not needed?
func (r *EventRouter) BranchState(host string, dst tracetest.Chan) {
func (r *EventRouter) BranchState(host string, dst string) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -375,7 +375,7 @@ func (r *EventRouter) BranchState(host string, dst tracetest.Chan) {
// networking cause root coming from b - go to dstb.
//
// XXX extend to support multiple ongoing streams (e.g. prefetch) ?
func (r *EventRouter) BranchLink(link string, dsta, dstb tracetest.Chan) {
func (r *EventRouter) BranchLink(link string, dsta, dstb string) {
r.mu.Lock()
defer r.mu.Unlock()
......
......@@ -30,22 +30,24 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// GoTraceCollector collects events from NEO/go trace points and sends them to event dispatcher.
//
// TraceCollector connects to NEO-specific trace points via probes and sends events to dispatcher.
//
// XXX naming -> GoTracer (and PyTracer for NEO/py)
type TraceCollector struct {
pg *tracing.ProbeGroup
d interface { Dispatch(interface{}) }
rx interface { RxEvent(interface{}) }
node2Name map[*NodeApp]string
nodeTab2Owner map[*NodeTable]string
clusterState2Owner map[*proto.ClusterState]string
}
func NewTraceCollector(dispatch interface { Dispatch(interface{}) }) *TraceCollector {
func NewTraceCollector(rx interface { RxEvent(interface{}) }) *TraceCollector {
return &TraceCollector{
pg: &tracing.ProbeGroup{},
d: dispatch,
rx: rx,
node2Name: make(map[*NodeApp]string),
nodeTab2Owner: make(map[*NodeTable]string),
......@@ -87,14 +89,14 @@ func (t *TraceCollector) RegisterNode(node *NodeApp, name string) {
}
func (t *TraceCollector) TraceNetDial(ev *xnet.TraceDial) {
t.d.Dispatch(&eventNetDial{
t.rx.RxEvent(&eventNetDial{
Dialer: ev.Dialer,
Addr: ev.Addr,
})
}
func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) {
t.d.Dispatch(&eventNetConnect{
t.rx.RxEvent(&eventNetConnect{
Src: ev.Src.String(),
Dst: ev.Dst.String(),
Dialed: ev.Dialed,
......@@ -102,29 +104,29 @@ func (t *TraceCollector) TraceNetConnect(ev *xnet.TraceConnect) {
}
func (t *TraceCollector) TraceNetListen(ev *xnet.TraceListen) {
t.d.Dispatch(&eventNetListen{Laddr: ev.Laddr.String()})
t.rx.RxEvent(&eventNetListen{Laddr: ev.Laddr.String()})
}
func (t *TraceCollector) TraceNetTx(ev *xnet.TraceTx) {} // we use traceNeoMsgSend instead
func (t *TraceCollector) traceNeoMsgSendPre(l *neonet.NodeLink, connID uint32, msg proto.Msg) {
t.d.Dispatch(&eventNeoSend{l.LocalAddr().String(), l.RemoteAddr().String(), connID, msg})
t.rx.RxEvent(&eventNeoSend{l.LocalAddr().String(), l.RemoteAddr().String(), connID, msg})
}
func (t *TraceCollector) traceClusterState(cs *proto.ClusterState) {
//t.d.Dispatch(&eventClusterState{cs, *cs})
//t.rx.RxEvent(&eventClusterState{cs, *cs})
where := t.clusterState2Owner[cs]
t.d.Dispatch(&eventClusterState{where, *cs})
t.rx.RxEvent(&eventClusterState{where, *cs})
}
func (t *TraceCollector) traceNode(nt *NodeTable, n *Node) {
//t.d.Dispatch(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
//t.rx.RxEvent(&eventNodeTab{unsafe.Pointer(nt), n.NodeInfo})
where := t.nodeTab2Owner[nt]
t.d.Dispatch(&eventNodeTab{where, n.NodeInfo})
t.rx.RxEvent(&eventNodeTab{where, n.NodeInfo})
}
func (t *TraceCollector) traceMasterStartReady(m *Master, ready bool) {
//t.d.Dispatch(masterStartReady(m, ready))
//t.rx.RxEvent(masterStartReady(m, ready))
where := t.node2Name[m.node]
t.d.Dispatch(&eventMStartReady{where, ready})
t.rx.RxEvent(&eventMStartReady{where, ready})
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment