Commit decaff6f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f4d307b3
...@@ -490,13 +490,21 @@ type Streams interface { ...@@ -490,13 +490,21 @@ type Streams interface {
type T struct { type T struct {
testing.TB testing.TB
streamTabMu sync.Mutex mu sync.Mutex
streamTab map[/*stream*/string]Chan // set to nil on test shutdown streamTab map[/*stream*/string]Chan // set to nil on test shutdown
routeEvent func(event interface{}) (stream string) routeEvent func(event interface{}) (stream string)
tracev []eventTrace // record of events as they happen
nakq []nak // naks queued to be sent after Fatal nakq []nak // naks queued to be sent after Fatal
} }
// eventTrace keeps information about one event T received via RxEvent.
type eventTrace struct {
t time.Time // monotonic
stream string
event interface{}
}
type nak struct { type nak struct {
msg *Msg msg *Msg
why string why string
...@@ -510,8 +518,8 @@ func (t *T) queuenak(msg *Msg, why string) { ...@@ -510,8 +518,8 @@ func (t *T) queuenak(msg *Msg, why string) {
// XXX doc; naming // XXX doc; naming
func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) { func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) {
t.streamTabMu.Lock() t.mu.Lock()
defer t.streamTabMu.Unlock() defer t.mu.Unlock()
if t.routeEvent != nil { if t.routeEvent != nil {
panic("double call to SetEventRouter") panic("double call to SetEventRouter")
...@@ -521,13 +529,15 @@ func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) { ...@@ -521,13 +529,15 @@ func (t *T) SetEventRouter(routeEvent func(event interface{}) (stream string)) {
// XXX doc; naming // XXX doc; naming
func (t *T) RxEvent(event interface{}) { func (t *T) RxEvent(event interface{}) {
t0 := time.Now()
stream := "default" stream := "default"
t.streamTabMu.Lock() t.mu.Lock()
if t.routeEvent != nil { if t.routeEvent != nil {
stream = t.routeEvent(event) stream = t.routeEvent(event)
} }
t.tracev = append(t.tracev, eventTrace{t0, stream, event})
ch := t.chanForStream(stream) ch := t.chanForStream(stream)
t.streamTabMu.Unlock() t.mu.Unlock()
if ch == nil { if ch == nil {
t.fatalfInNonMain("%s: (pre)send: canceled (test failed)", stream) t.fatalfInNonMain("%s: (pre)send: canceled (test failed)", stream)
...@@ -566,7 +576,7 @@ func (t *T) RxEvent(event interface{}) { ...@@ -566,7 +576,7 @@ func (t *T) RxEvent(event interface{}) {
} }
// chanForStream returns channel corresponding to stream. // chanForStream returns channel corresponding to stream.
// must be called under streamTabMu. // must be called under mu.
func (t *T) chanForStream(stream string) Chan { func (t *T) chanForStream(stream string) Chan {
if t.streamTab == nil { if t.streamTab == nil {
return nil // t is no longer operational after e.g. deadlock return nil // t is no longer operational after e.g. deadlock
...@@ -623,9 +633,9 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg { ...@@ -623,9 +633,9 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg {
func (t *T) xget1(stream string, eventp interface{}) *Msg { func (t *T) xget1(stream string, eventp interface{}) *Msg {
t.Helper() t.Helper()
t.streamTabMu.Lock() t.mu.Lock()
ch := t.chanForStream(stream) ch := t.chanForStream(stream)
t.streamTabMu.Unlock() t.mu.Unlock()
// XXX ch == nil -> no longer operational // XXX ch == nil -> no longer operational
...@@ -694,10 +704,10 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -694,10 +704,10 @@ func (t *T) closeStreamTab() (nnak int) {
t.Helper() t.Helper()
// mark streamTab no longer operational // mark streamTab no longer operational
t.streamTabMu.Lock() t.mu.Lock()
streamTab := t.streamTab streamTab := t.streamTab
t.streamTab = nil t.streamTab = nil
t.streamTabMu.Unlock() t.mu.Unlock()
if streamTab == nil { if streamTab == nil {
return // already closed return // already closed
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment