Commit 2ec2288e authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 454b6576
// Copyright (C) 2018 Nexedi SA and Contributors.
// Copyright (C) 2018-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -17,6 +17,104 @@
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.
// XXX explain
package tracetest_test
// TODO example for tracetest usage
//go:generate gotrace gen .
import (
"fmt"
"strings"
"sync"
"testing"
"lab.nexedi.com/kirr/go123/tracing"
"lab.nexedi.com/kirr/neo/go/internal/xtracing/tracetest"
)
// hi and hello are functions that emit "(Hi|Hello), <who>" and can be traced.
//trace:event traceHi(who string)
//trace:event traceHello(who string)
func hi(who string) {
traceHi(who)
fmt.Println("Hi,", who)
}
func hello(who string) {
traceHello(who)
fmt.Println("Hello,", who)
}
// TestTracetestExample demonstrates how to use tracetest to verify concurrent system with 2 threads.
func TestTracetestExample(t *testing.T) {
type eventHi string
type eventHello string
testf := func(t *tracetest.T) {
// setup tracing to deliver trace events to t XXX not to t, but to something dedicated?
pg := &tracing.ProbeGroup{}
tracing.Lock()
traceHi_Attach(pg, func(who string) {
// XXX NewEvent? IncomingEvent?
t.RxEvent(eventHi(who)) // XXX -> t.OnEvent? not via t?
})
traceHello_Attach(pg, func(who string) {
t.RxEvent(eventHello(who))
})
tracing.Unlock()
defer pg.Done()
// tell tracetest to which stream an event should go.
t.SetEventRouter(func(event interface{}) (stream string) {
// it can be only eventHi and eventHello.
// in this test the convention is that who comes as <threadID>·...
// we used threadID as stream.
who := ""
switch ev := event.(type) {
default:
panic(fmt.Sprintf("unexpected event type %T", event))
case eventHi:
who = string(ev)
case eventHello:
who = string(ev)
}
i := strings.Index(who, "·")
if i == -1 {
panic(fmt.Sprintf("who does not have threadID: %q", who))
}
return strings.ToLower(who[:i])
})
// run the workload
var wg sync.WaitGroup
defer wg.Wait()
wg.Add(2)
go func() { // thread1
defer wg.Done()
hi("T1·A")
hello("T1·B")
}()
go func() { // thread2
defer wg.Done()
hi("T2·C")
}()
// assert that events come as expected
t.Expect("t3", eventHi("T1·C"))
t.Expect("t1", eventHi("T1·A"))
t.Expect("t1", eventHello("T1·B"))
// XXX also t.Recv, t.ExpectNoAck ?
//t1 := t.OnStream("t1")
//t1.Expect(eventHi("X·A"))
}
tracetest.Verify(t, testf)
}
......@@ -46,13 +46,14 @@
//
// The package should be used as follows:
//
// XXX tracer -> trace collector?
// - implement tracer that will be synchronously collecting events from
// execution of your program. This can be done with package
// lab.nexedi.com/kirr/go123/tracing or by other similar means.
//
// the tracer have to output events to dispatcher (see below).
//
// - implement router that will be making decisions specific to your
// - implement router that will be making decisions specific to your XXX -> Streams
// particular testing scenario on to which stream an event should belong.
//
// the router will be consulted by dispatcher (see below) for its working.
......@@ -78,15 +79,21 @@
package tracetest
import (
"errors"
"flag"
"fmt"
"sort"
"strings"
"sync"
"reflect"
"runtime"
"runtime/debug"
"testing"
"time"
"github.com/kylelemons/godebug/pretty"
// "lab.nexedi.com/kirr/go123/xruntime"
)
var (
......@@ -133,13 +140,14 @@ type ChanRx interface {
// The goroutine which sent the message will wait for Ack before continue.
type Msg struct {
Event interface {}
ack chan<- bool // XXX -> chan<- error and abort tested goroutine if error?
ack chan<- error // nil on Ack; !nil on Nak
}
// _chan implements Chan.
type _chan struct {
t *T
msgq chan *Msg
_name string
}
......@@ -154,11 +162,11 @@ func (ch *_chan) Send(event interface{}) {
if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name(), event, event)
}
ack := make(chan bool)
ack := make(chan error)
ch.msgq <- &Msg{event, ack}
ok := <-ack
if !ok {
panicf("%s: send: deadlock", ch.name())
err := <-ack
if err != nil {
ch.t.fatalfInNonMain("%s: send: %s", ch.name(), err)
}
}
......@@ -173,10 +181,15 @@ func (ch *_chan) _rxq() <-chan *Msg {
return ch.msgq
}
// XXX -> Unpause?
// XXX -> Unpause? Cont? Continue?
// XXX Ack(err)? or add Nak(err)?
// Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *Msg) Ack() {
m.ack <- true
m.ack <- nil
}
func (m *Msg) Nak(why string) {
m.ack <- errors.New(why)
}
// NewChan creates new Chan channel.
......@@ -322,7 +335,7 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
if len(sendv) == 0 {
bad += fmt.Sprintf("noone is sending\n")
} else {
bad += fmt.Sprintf("there are %d sender(s) on other channel(s):\n", len(sendv))
bad += fmt.Sprintf("there are %d pending sender(s) on other channel(s):\n", len(sendv))
for _, __ := range sendv {
bad += fmt.Sprintf("%s:\t%T %v\n", __.dst.name(), __.event, __.event)
}
......@@ -348,8 +361,6 @@ type EventRouter interface {
}
*/
// XXX -> T
type T = EventDispatcher
// EventDispatcher dispatches events to appropriate Chan for checking
// according to provided streams.
type EventDispatcher struct {
......@@ -450,6 +461,248 @@ type Streams interface {
RouteEvent(event interface{}) Chan
}
// T is similar to testing.T and is passed by Verify to tested function.
//
// See top-level package documentation for details.
type T struct {
testing.TB
streamTabMu sync.Mutex
streamTab map[/*stream*/string]Chan
routeEvent func(event interface{}) (stream string)
}
// XXX doc; naming
func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) {
t.streamTabMu.Lock()
defer t.streamTabMu.Unlock()
if t.routeEvent != nil {
panic("double call to SetEventRouter")
}
t.routeEvent = routeEvent
}
// XXX doc; naming
func (t *T) RxEvent(event interface{}) {
t.streamTabMu.Lock()
ch := t.chanForEvent(event)
t.streamTabMu.Unlock()
if ch == nil {
t.fatalfInNonMain("test is no longer operational")
}
// TODO it is possible to empirically detect here if a test incorrectly
// decomposed its system into serial streams: consider unrelated to each
// other events A and B are incorrectly routed to the same channel. It
// could be so happening that the order of checks on the test side is
// almost always correct and so the error is not visible. However
//
// if we add delays to delivery of either A or B
// and test both combinations
//
// we will for sure detect the error as, if A and B are indeed
// unrelated, one of the delay combination will result in events
// delivered to test in different to what it expects order.
//
// the time for delay could be taken as follows:
//
// - run the test without delay; collect δt between events on particular stream
// - take delay = max(δt)·10
//
// to make sure there is indeed no different orderings possible on the
// stream, rerun the test N(event-on-stream) times, and during i'th run
// delay i'th event.
//
// TODO See also on this topic:
// http://www.1024cores.net/home/relacy-race-detector
// http://www.1024cores.net/home/relacy-race-detector/rrd-introduction
// TODO timeout: deadlock? (print all-in-flight events on timeout)
// XXX or better ^^^ to do on receiver side?
ch.Send(event)
}
// chanForEvent returns channel corresponding to stream where event has to be delivered.
// must be called under streamTabMu.
func (t *T) chanForEvent(event interface{}) Chan {
stream := "default"
if t.routeEvent != nil {
stream = t.routeEvent(event)
}
return t.chanForStream(stream)
}
// chanForStream returns channel corresponding to stream.
// must be called under streamTabMu.
func (t *T) chanForStream(stream string) Chan {
if t.streamTab == nil {
return nil // t is no longer operational after e.g. deadlock
}
ch, ok := t.streamTab[stream]
if !ok {
ch = NewChan(stream)
ch.(*_chan).t = t // XXX move into NewChan
t.streamTab[stream] = ch
}
return ch
}
// Expect receives next event on stream and verifies it to be equal to eventOK.
//
// If check is successful ACK is sent back to event producer.
// If check does not pass - fatal testing error is raised.
func (t *T) Expect(stream string, eventOK interface{}) {
t.Helper()
msg := t.expect1(stream, eventOK)
msg.Ack()
}
// XXX ExpectNoACK
// XXX Recv
// expect1 receives next event on stream and verifies it to be equal to eventOK (both type and value).
//
// if checks do not pass - fatal testing error is raised.
func (t *T) expect1(stream string, eventExpect interface{}) *Msg {
t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
reventp := reflect.New(reventExpect.Type())
msg := t.xget1(stream, reventp.Interface())
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
msg.Nak("expect failed")
t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff: %s",
stream,
reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface()))
}
return msg
}
// xget1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
func (t *T) xget1(stream string, eventp interface{}) *Msg {
t.Helper()
t.streamTabMu.Lock()
ch := t.chanForStream(stream)
t.streamTabMu.Unlock()
// XXX ch == nil -> no longer operational
var msg *Msg
select {
case msg = <-ch._rxq(): // unwrapped Recv
// ok
case <-time.After(*deadTime):
t.deadlock(stream, eventp)
}
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
// msg.nak <- error
t.Fatalf("%s: expect: %s: got %#v", ch.name(), reventp.Elem().Type(), msg.Event)
}
// *eventp = msg.Event
reventp.Elem().Set(reflect.ValueOf(msg.Event))
return msg
}
// deadlock reports diagnostic when retrieving event from stream times out.
//
// Timing out on recv means there is a deadlock either if no event was sent at
// all, or some other event was sent to another channel/checker.
//
// Report the full picture - what was expected and what was sent where.
func (t *T) deadlock(stream string, eventp interface{}) {
t.Helper()
// mark streamTab no longer operational XXX ok?
t.streamTabMu.Lock()
streamTab := t.streamTab
t.streamTab = nil
t.streamTabMu.Unlock()
bad := fmt.Sprintf("%s: deadlock waiting for %T\n", stream, eventp)
type sendInfo struct{ch Chan; msg *Msg}
var sendv []sendInfo
for _, ch := range streamTab {
// check whether someone is sending on a dst without blocking.
// if yes - report to sender there is a problem - so it can cancel its task.
select {
case msg := <-ch._rxq():
sendv = append(sendv, sendInfo{ch, msg})
default:
}
// XXX panic triggering disabled because if sender panics we have no chance to continue
// TODO retest this
// in any case close channel where future Sends may arrive so that will panic too.
//close(dst.msgq)
}
// order channels by name
sort.Slice(sendv, func(i, j int) bool {
return strings.Compare(sendv[i].ch.name(), sendv[j].ch.name()) < 0
})
if len(sendv) == 0 {
bad += fmt.Sprintf("noone is sending\n")
} else {
bad += fmt.Sprintf("there are %d pending sender(s) on other channel(s):\n", len(sendv))
for _, __ := range sendv {
bad += fmt.Sprintf("%s:\t%T %v\n", __.ch.name(), __.msg.Event, __.msg.Event)
}
}
// log the deadlock details and nak all senders.
// nak them only after deadlock printout, so that the deadlock text
// comes first, and their "panics" don't get intermixed with it.
t.Log(bad)
for _, __ := range sendv {
__.msg.Nak("deadlock")
}
t.FailNow()
}
// fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic.
var fatalLogMu sync.Mutex
func (t *T) fatalfInNonMain(format string, argv ...interface{}) {
t.Helper()
// serialized fatal log+traceback printout, so that such printouts from
// multiple goroutines do not get intermixed.
fatalLogMu.Lock()
defer fatalLogMu.Unlock()
t.Logf(format, argv...)
// XXX don't panic - it will stop the process and prevent the
// main goroutine to print detailed reason for e.g. deadlock or
// other error.
// XXX +traceback
t.Logf("%s\n", debug.Stack())
runtime.Goexit()
}
// TODO func Verify(system)
// system() ->:
......@@ -462,14 +715,18 @@ type Streams interface {
// Verify verifies a system.
// XXX
func Verify(system func() (
Streams,
/*main*/ func(),
/*testf*/ func(t *T),
)) {
func Verify(t testing.TB, testf func(t *T)) {
tT := &T{TB: t, streamTab: make(map[string]Chan)}
// XXX
}
testf(tT)
// XXX in the end: verify that no events are left unchecked /
// unconsumed (e.g. sent to Dispatch, but not received).
// XXX in the end: verify that streams are the same from run to run (if completed successfully).
}
// ---- misc ----
......
// Code generated by lab.nexedi.com/kirr/go123/tracing/cmd/gotrace; DO NOT EDIT.
package tracetest_test
// code generated for tracepoints
import (
"lab.nexedi.com/kirr/go123/tracing"
"unsafe"
)
// traceevent: traceHello(who string)
type _t_traceHello struct {
tracing.Probe
probefunc func(who string)
}
var _traceHello *_t_traceHello
func traceHello(who string) {
if _traceHello != nil {
_traceHello_run(who)
}
}
func _traceHello_run(who string) {
for p := _traceHello; p != nil; p = (*_t_traceHello)(unsafe.Pointer(p.Next())) {
p.probefunc(who)
}
}
func traceHello_Attach(pg *tracing.ProbeGroup, probe func(who string)) *tracing.Probe {
p := _t_traceHello{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceHello)), &p.Probe)
return &p.Probe
}
// traceevent: traceHi(who string)
type _t_traceHi struct {
tracing.Probe
probefunc func(who string)
}
var _traceHi *_t_traceHi
func traceHi(who string) {
if _traceHi != nil {
_traceHi_run(who)
}
}
func _traceHi_run(who string) {
for p := _traceHi; p != nil; p = (*_t_traceHi)(unsafe.Pointer(p.Next())) {
p.probefunc(who)
}
}
func traceHi_Attach(pg *tracing.ProbeGroup, probe func(who string)) *tracing.Probe {
p := _t_traceHi{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceHi)), &p.Probe)
return &p.Probe
}
// trace export signature
func _trace_exporthash_51bb0086e9435e499919853bae2dbb429f70d833() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment