Commit af6b146c authored by Kirill Smelkov's avatar Kirill Smelkov


parent 3055a942
......@@ -808,34 +808,41 @@ func (t *T) closeStreamTab() (nnak int) {
// - main func to run the system
// - testf (serial) func to verify events that comes from channels
// Verify verifies a test system.
// XXX
// XXX try back -> testing.TB? (but TB does not have .Run)
func Verify(t *testing.T, f func(t *T)) {
testf := func(t testing.TB, delayInjectTab map[string]*delayInjectState) *T {
tT := &T{
_testing_TB: t,
streamTab: make(map[string]_Chan),
delayInjectTab: delayInjectTab,
// verify in the end that no events are left unchecked / unconsumed,
// e.g. sent to RxEvent, but not received. Nak them if they are and fail.
// NOTE this complements T.Fatal and friends, because a test might
// think it completes successfully, but leaves unconsumed events behind it.
defer func() {
nnak := tT.closeStreamTab()
if nnak != 0 {
// Run runs f under tracing.
// It is similar to Verify but f is ran only once.
func Run(t testing.TB, f func(t *T)) {
run(t, f, nil)
return tT
// run serves Run and Verify: it creates T that wraps t, and runs f under T.
func run(t testing.TB, f func(t *T), delayInjectTab map[string]*delayInjectState) *T {
tT := &T{
_testing_TB: t,
streamTab: make(map[string]_Chan),
delayInjectTab: delayInjectTab,
tT0 := testf(t, nil)
// verify in the end that no events are left unchecked / unconsumed,
// e.g. sent to RxEvent, but not received. Nak them if they are and fail.
// NOTE this complements T.Fatal and friends, because a test might
// think it completes successfully, but leaves unconsumed events behind it.
defer func() {
nnak := tT.closeStreamTab()
if nnak != 0 {
return tT
// Verify verifies a test system.
// XXX
func Verify(t *testing.T, f func(t *T)) {
tT0 := run(t, f, nil)
// now, if f succeeds, verify f with injected delays.
// XXX explain with more text
......@@ -882,7 +889,7 @@ func Verify(t *testing.T, f func(t *T)) {
t.Run(fmt.Sprintf("delay@%d(=%s:%d)", i, stream, istream), func(t *testing.T) {
tT := testf(t, map[string]*delayInjectState{
tT := run(t, f, map[string]*delayInjectState{
stream: &delayInjectState{
delayAt: istream,
delayT: delayT,
......@@ -366,94 +366,49 @@ func (d tdispatch1) Dispatch(event interface{}) {
func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit func(xcload1 func())) {
X := exc.Raiseif
// create test cluster
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
var t0 *tracetest.T // XXX stub
t := tNewCluster_MS(t0, "abc1", zback)
defer t.Stop()
M := t.Master("m")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
wg := xsync.NewWorkGroup(ctx)
defer wg.Wait()
// spawn M
M := tNewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
cG := tracetest.NewChan("main")
tG := tracetest.NewEventChecker(b, nil , cG)
tracer := NewTraceCollector(tdispatch1{cG})
tracer.RegisterNode(M.node, "m")
pnode := traceNodeChanged_Attach(nil, tracer.traceNode)
traceMasterStartReady_Attach(, tracer.traceMasterStartReady)
wg.Go(func(ctx context.Context) error {
return M.Run(ctx)
// determining M serving address XXX better with M api
ev := cG.Recv()
mnode, ok := ev.Event.(*eventNodeTab)
if !ok {
b.Fatalf("after M start: got %T ; want eventNodeTab", ev.Event)
Maddr := mnode.NodeInfo.Addr.String()
// now after we know Maddr create S & C and start S serving
S := tNewStorage("abc1", Maddr, "", Snet, zback)
wg.Go(func(ctx context.Context) error {
return S.Run(ctx)
err := M.Start(); X(err)
C := t.NewClient("c", "m:1")
tracetest.Run(b, func(t0 *tracetest.T) {
// create test cluster
zback := xfs1back("../zodb/storage/fs1/testdata/1.fs")
t := tNewCluster_MS(t0, "abc1", zback)
defer t.Stop()
M := t.Master("m")
err := M.Start(); X(err)
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
C := t.NewClient("c", "m:1")
obj1, err := zback.Load(ctx, xid1)
if err != nil {
buf1, serial1 := obj1.Data, obj1.Serial
xid1 := zodb.Xid{Oid: 1, At: zodb.TidMax}
// C.Load(xid1)
xcload1 := func() {
cbuf1, cserial1, err := C.Load(ctx, xid1)
obj1, err := zback.Load(t.runCtx, xid1) // XXX -> t.Ctx ?
if err != nil {
buf1, serial1 := obj1.Data, obj1.Serial
if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
t.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
// C.Load(xid1)
xcload1 := func() {
cbuf1, cserial1, err := C.Load(t.runCtx, xid1) // XXX -> t.Ctx ?
if err != nil {
if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
t.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
// do first C.Load - this also implicitly waits for M & S to come up
// and C to connect to M and S.
// do first C.Load - this also implicitly waits for M & S to come up
// and C to connect to M and S.
// now start the benchmark
// now start the benchmark
func benchmarkGetObjectSerial(b *testing.B, Mnet, Snet, Cnet xnet.Networker) {
......@@ -61,7 +61,7 @@ type tCluster struct {
nodeTab map[string/*node*/]*tNode
runCtx context.Context // runCtx is canceled on .Stop()
runCtx context.Context // runCtx is canceled on .Stop() TODO or test failure XXX -> .ctx?
runWG *xsync.WorkGroup // started nodes are .Run() under runWG
runCancel func()
Markdown is supported
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment