Commit 805e5c12 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 811d83a1
...@@ -25,10 +25,13 @@ import ( ...@@ -25,10 +25,13 @@ import (
"errors" "errors"
"flag" "flag"
"fmt" "fmt"
"reflect"
"time"
) )
var ( var (
chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels") chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels")
deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock")
) )
// _Msg represents message with 1 event sent over _chan. // _Msg represents message with 1 event sent over _chan.
...@@ -81,11 +84,50 @@ func (ch *_chan) Close() { ...@@ -81,11 +84,50 @@ func (ch *_chan) Close() {
// Recv receives message from a producer. // Recv receives message from a producer.
// //
// The consumer, after dealing with the message, must send back an ack. // The consumer, after dealing with the message, must send back an ack.
// Must be called from main testing thread.
func (ch *_chan) Recv() *_Msg { func (ch *_chan) Recv() *_Msg {
msg := <-ch.msgq // XXX detect deadlock t := ch.t; t.Helper()
msg := ch.recv()
if msg == nil {
t.Fatalf("%s: recv: deadlock\n", ch.name)
}
return msg
}
// RecvInto receives message from a producer, verifies that event type is the
// same as type of *event, and saves received event there.
//
// Must be called from main testing thread.
func (ch *_chan) RecvInto(eventp interface{}) *_Msg {
t := ch.t; t.Helper()
msg := ch.recv()
if msg == nil {
t.Fatalf("%s: recv: deadlock waiting for %T\n", ch.name, eventp)
}
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
t.queuenak(msg, "unexpected event type")
t.Fatalf("%s: expect: %s: got %T %v", ch.name, reventp.Elem().Type(), msg.Event, msg.Event)
}
// *eventp = msg.Event
reventp.Elem().Set(reflect.ValueOf(msg.Event))
return msg return msg
} }
func (ch *_chan) recv() *_Msg {
select {
case msg := <-ch.msgq:
return msg // ok
case <-time.After(*deadTime):
return nil // deadlock
}
}
// XXX -> Unpause? Cont? Continue? // XXX -> Unpause? Cont? Continue?
// Ack acknowledges the event was processed and unblocks producer goroutine. // Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *_Msg) Ack() { func (m *_Msg) Ack() {
......
...@@ -102,7 +102,6 @@ package tracetest ...@@ -102,7 +102,6 @@ package tracetest
// http://www.1024cores.net/home/relacy-race-detector/rrd-introduction // http://www.1024cores.net/home/relacy-race-detector/rrd-introduction
import ( import (
"flag"
"fmt" "fmt"
"sort" "sort"
"strings" "strings"
...@@ -116,9 +115,11 @@ import ( ...@@ -116,9 +115,11 @@ import (
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
) )
/*
var ( var (
deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock") deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock")
) )
*/
// _testing_TB is alias for testing.TB that is non-public when embedded into a struct. // _testing_TB is alias for testing.TB that is non-public when embedded into a struct.
type _testing_TB = testing.TB type _testing_TB = testing.TB
...@@ -136,7 +137,7 @@ type T struct { ...@@ -136,7 +137,7 @@ type T struct {
_testing_TB _testing_TB
mu sync.Mutex mu sync.Mutex
streamTab map[/*stream*/string]*_chan // set to nil on test shutdown streamTab map[/*stream*/string]*_chan // where events on stream are delivered; set to nil on test shutdown
routeEvent func(event interface{}) (stream string) routeEvent func(event interface{}) (stream string)
tracev []eventTrace // record of events as they happen tracev []eventTrace // record of events as they happen
delayInjectTab map[/*stream*/string]*delayInjectState delayInjectTab map[/*stream*/string]*delayInjectState
...@@ -146,7 +147,7 @@ type T struct { ...@@ -146,7 +147,7 @@ type T struct {
// eventTrace keeps information about one event T received via RxEvent. // eventTrace keeps information about one event T received via RxEvent.
type eventTrace struct { type eventTrace struct {
t time.Time // monotonic t time.Time // time of receive; monotonic
stream string stream string
event interface{} event interface{}
} }
...@@ -164,11 +165,6 @@ type nak struct { ...@@ -164,11 +165,6 @@ type nak struct {
why string why string
} }
// XXX place; just use t.Cleanup instead?
func (t *T) queuenak(msg *_Msg, why string) {
t.nakq = append(t.nakq, nak{msg, why})
}
// SetEventRouter tells t to which stream an event should go. // SetEventRouter tells t to which stream an event should go.
// //
...@@ -189,8 +185,6 @@ func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) { ...@@ -189,8 +185,6 @@ func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) {
// The sequential process of the test system where event originated should be // The sequential process of the test system where event originated should be
// paused until RxEvent returns. This requirement can be usually met via // paused until RxEvent returns. This requirement can be usually met via
// inserting t.RxEvent() call into the code that produces the event. // inserting t.RxEvent() call into the code that produces the event.
//
// XXX naming; NewEvent? IncomingEvent? OnEvent?
func (t *T) RxEvent(event interface{}) { func (t *T) RxEvent(event interface{}) {
t0 := time.Now() t0 := time.Now()
stream := "" stream := ""
...@@ -226,6 +220,21 @@ func (t *T) RxEvent(event interface{}) { ...@@ -226,6 +220,21 @@ func (t *T) RxEvent(event interface{}) {
ch.Send(event) ch.Send(event)
} }
// XXX Chan.Send
// XXX Chan.Close
// XXX Chan.Recv + RecvInto ?
// chanForStream returns channel corresponding to stream. // chanForStream returns channel corresponding to stream.
// must be called under mu. // must be called under mu.
func (t *T) chanForStream(stream string) *_chan { func (t *T) chanForStream(stream string) *_chan {
...@@ -294,26 +303,7 @@ func (t *T) xget1(stream string, eventp interface{}) *_Msg { ...@@ -294,26 +303,7 @@ func (t *T) xget1(stream string, eventp interface{}) *_Msg {
// XXX ch == nil -> no longer operational // XXX ch == nil -> no longer operational
var msg *_Msg return ch.RecvInto(eventp)
select {
case msg = <-ch.msgq: // unwrapped Recv
// ok
case <-time.After(*deadTime):
t.Fatalf("%s: recv: deadlock waiting for %T\n", stream, eventp)
}
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
t.queuenak(msg, "unexpected event type")
t.Fatalf("%s: expect: %s: got %T %v", ch.name, reventp.Elem().Type(), msg.Event, msg.Event)
}
// *eventp = msg.Event
reventp.Elem().Set(reflect.ValueOf(msg.Event))
return msg
} }
// fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic. // fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic.
...@@ -544,3 +534,9 @@ func streamsOfTrace(tracev []eventTrace) []string { ...@@ -544,3 +534,9 @@ func streamsOfTrace(tracev []eventTrace) []string {
sort.Strings(streamv) sort.Strings(streamv)
return streamv return streamv
} }
// XXX place; just use t.Cleanup instead?
func (t *T) queuenak(msg *_Msg, why string) {
t.nakq = append(t.nakq, nak{msg, why})
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment