Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Support
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
N
neo
Project overview
Project overview
Details
Activity
Releases
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Labels
Merge Requests
2
Merge Requests
2
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Analytics
Analytics
CI / CD
Repository
Value Stream
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Jobs
Commits
Open sidebar
Kirill Smelkov
neo
Commits
9de249df
Commit
9de249df
authored
Dec 23, 2020
by
Kirill Smelkov
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
.
parent
db7d0481
Changes
2
Hide whitespace changes
Inline
Side-by-side
Showing
2 changed files
with
104 additions
and
75 deletions
+104
-75
go/internal/xtracing/tracetest/example_test.go
go/internal/xtracing/tracetest/example_test.go
+1
-1
go/internal/xtracing/tracetest/tracetest.go
go/internal/xtracing/tracetest/tracetest.go
+103
-74
No files found.
go/internal/xtracing/tracetest/example_test.go
View file @
9de249df
...
@@ -106,7 +106,7 @@ func TestTracetestExample(t *testing.T) {
...
@@ -106,7 +106,7 @@ func TestTracetestExample(t *testing.T) {
}()
}()
// assert that events come as expected
// assert that events come as expected
t
.
Expect
(
"t2"
,
eventH
i
(
"T1·C"
))
t
.
Expect
(
"t2"
,
eventH
ello
(
"T1·C"
))
t
.
Expect
(
"t1"
,
eventHi
(
"T1·A"
))
t
.
Expect
(
"t1"
,
eventHi
(
"T1·A"
))
t
.
Expect
(
"t1"
,
eventHello
(
"T1·B"
))
t
.
Expect
(
"t1"
,
eventHello
(
"T1·B"
))
...
...
go/internal/xtracing/tracetest/tracetest.go
View file @
9de249df
...
@@ -216,6 +216,7 @@ func NewChan(name string) Chan {
...
@@ -216,6 +216,7 @@ func NewChan(name string) Chan {
// ----------------------------------------
// ----------------------------------------
/*
// XXX -> tracetest.TSerial ? tracetest.TSeq?
// XXX -> tracetest.TSerial ? tracetest.TSeq?
// EventChecker is testing utility to verify that sequence of events coming
// EventChecker is testing utility to verify that sequence of events coming
...
@@ -276,7 +277,7 @@ func (evc *EventChecker) expect1(eventExpect interface{}) *Msg {
...
@@ -276,7 +277,7 @@ func (evc *EventChecker) expect1(eventExpect interface{}) *Msg {
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
// msg.ack <- error
// msg.ack <- error
evc
.
t
.
Fatalf
(
"%s: expect: %s:
\n
want: %v
\n
have: %v
\n
diff: %s"
,
evc.t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff: %s
\n
",
evc.in.name(),
evc.in.name(),
reventExpect.Type(), reventExpect, revent,
reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface()))
pretty.Compare(reventExpect.Interface(), revent.Interface()))
...
@@ -358,6 +359,8 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
...
@@ -358,6 +359,8 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
evc.t.Fatal(bad)
evc.t.Fatal(bad)
}
}
*/
// ----------------------------------------
// ----------------------------------------
...
@@ -375,13 +378,14 @@ type EventRouter interface {
...
@@ -375,13 +378,14 @@ type EventRouter interface {
}
}
*/
*/
// EventDispatcher dispatches events to appropriate Chan for checking
//
//
EventDispatcher dispatches events to appropriate Chan for checking
// according to provided streams.
//
//
according to provided streams.
type
EventDispatcher
struct
{
//
type EventDispatcher struct {
streams
map
[
/*name*/
string
]
Chan
// XXX -> ChanTx? just str
//
streams map[/*name*/string]Chan // XXX -> ChanTx? just str
routeEvent
func
(
event
interface
{})
Chan
// XXX ----//----
//
routeEvent func(event interface{}) Chan // XXX ----//----
}
//
}
/*
// NewEventDispatcher creates new dispatcher and provides router to it.
// NewEventDispatcher creates new dispatcher and provides router to it.
func NewEventDispatcher(streams Streams) *EventDispatcher {
func NewEventDispatcher(streams Streams) *EventDispatcher {
// build streams as dict and make sure all streams are named uniquely
// build streams as dict and make sure all streams are named uniquely
...
@@ -399,70 +403,73 @@ func NewEventDispatcher(streams Streams) *EventDispatcher {
...
@@ -399,70 +403,73 @@ func NewEventDispatcher(streams Streams) *EventDispatcher {
routeEvent: streams.RouteEvent,
routeEvent: streams.RouteEvent,
}
}
}
}
// XXX -> tracetest.T ?
// Dispatch dispatches event to appropriate output channel.
//
// It is safe to use Dispatch from multiple goroutines simultaneously.
func
(
d
*
EventDispatcher
)
Dispatch
(
event
interface
{})
{
ch
:=
d
.
routeEvent
(
event
)
// assert that ch is within AllStreams() return.
if
ch
==
nil
{
panicf
(
"Event %#v routed to nil"
,
event
)
}
/* XXX deactivated for now because neo.tNewCluster creates NewEventDispatcher with initially empty router
ch_, ok := d.streams[ch.name]
if !ok {
all := []string{}
for name := range d.streams {
all = append(all, name)
}
sort.Slice(all, func(i, j int) bool {
return strings.Compare(all[i], all[j]) < 0
})
panicf("Event %#v routed to stream %q that is outside of AllStreams %q", event, ch.name, all)
}
if ch != ch_ {
panicf("Event %#v routed to stream %q, but channel is different from what AllSteams[%q] originally reported", event, ch.name, ch.name)
}
*/
*/
// TODO it is possible to empirically detect here if a test incorrectly
// // XXX -> tracetest.T ?
// decomposed its system into serial streams: consider unrelated to each
// // Dispatch dispatches event to appropriate output channel.
// other events A and B are incorrectly routed to the same channel. It
// //
// could be so happening that the order of checks on the test side is
// // It is safe to use Dispatch from multiple goroutines simultaneously.
// almost always correct and so the error is not visible. However
// func (d *EventDispatcher) Dispatch(event interface{}) {
//
// ch := d.routeEvent(event)
// if we add delays to delivery of either A or B
//
// and test both combinations
// // assert that ch is within AllStreams() return.
//
// if ch == nil {
// we will for sure detect the error as, if A and B are indeed
// panicf("Event %#v routed to nil", event)
// unrelated, one of the delay combination will result in events
// }
// delivered to test in different to what it expects order.
// /* XXX deactivated for now because neo.tNewCluster creates NewEventDispatcher with initially empty router
//
// ch_, ok := d.streams[ch.name]
// the time for delay could be taken as follows:
// if !ok {
//
// all := []string{}
// - run the test without delay; collect δt between events on particular stream
// for name := range d.streams {
// - take delay = max(δt)·10
// all = append(all, name)
//
// }
// to make sure there is indeed no different orderings possible on the
// sort.Slice(all, func(i, j int) bool {
// stream, rerun the test N(event-on-stream) times, and during i'th run
// return strings.Compare(all[i], all[j]) < 0
// delay i'th event.
// })
//
// panicf("Event %#v routed to stream %q that is outside of AllStreams %q", event, ch.name, all)
// TODO See also on this topic:
// }
// http://www.1024cores.net/home/relacy-race-detector
// if ch != ch_ {
// http://www.1024cores.net/home/relacy-race-detector/rrd-introduction
// panicf("Event %#v routed to stream %q, but channel is different from what AllSteams[%q] originally reported", event, ch.name, ch.name)
// }
// */
//
// // TODO it is possible to empirically detect here if a test incorrectly
// // decomposed its system into serial streams: consider unrelated to each
// // other events A and B are incorrectly routed to the same channel. It
// // could be so happening that the order of checks on the test side is
// // almost always correct and so the error is not visible. However
// //
// // if we add delays to delivery of either A or B
// // and test both combinations
// //
// // we will for sure detect the error as, if A and B are indeed
// // unrelated, one of the delay combination will result in events
// // delivered to test in different to what it expects order.
// //
// // the time for delay could be taken as follows:
// //
// // - run the test without delay; collect δt between events on particular stream
// // - take delay = max(δt)·10
// //
// // to make sure there is indeed no different orderings possible on the
// // stream, rerun the test N(event-on-stream) times, and during i'th run
// // delay i'th event.
// //
// // TODO See also on this topic:
// // http://www.1024cores.net/home/relacy-race-detector
// // http://www.1024cores.net/home/relacy-race-detector/rrd-introduction
//
//
// // TODO timeout: deadlock? (print all-in-flight events on timeout)
// // XXX or better ^^^ to do on receiver side?
// //
// // XXX -> if deadlock detection is done on receiver side (so in
// // EventChecker) -> we don't need EventDispatcher at all?
// ch.Send(event)
// }
// TODO timeout: deadlock? (print all-in-flight events on timeout)
// XXX or better ^^^ to do on receiver side?
//
// XXX -> if deadlock detection is done on receiver side (so in
// EventChecker) -> we don't need EventDispatcher at all?
ch
.
Send
(
event
)
}
/*
// Streams represents decomposition for events in a system into serial streams.
// Streams represents decomposition for events in a system into serial streams.
type Streams interface {
type Streams interface {
...
@@ -475,6 +482,10 @@ type Streams interface {
...
@@ -475,6 +482,10 @@ type Streams interface {
RouteEvent(event interface{}) Chan
RouteEvent(event interface{}) Chan
}
}
*/
// ------ vvv = ok ------
// T is similar to testing.T and is passed by Verify to tested function.
// T is similar to testing.T and is passed by Verify to tested function.
//
//
// See top-level package documentation for details.
// See top-level package documentation for details.
...
@@ -484,8 +495,21 @@ type T struct {
...
@@ -484,8 +495,21 @@ type T struct {
streamTabMu
sync
.
Mutex
streamTabMu
sync
.
Mutex
streamTab
map
[
/*stream*/
string
]
Chan
streamTab
map
[
/*stream*/
string
]
Chan
routeEvent
func
(
event
interface
{})
(
stream
string
)
routeEvent
func
(
event
interface
{})
(
stream
string
)
nakq
[]
nak
// naks queued to be sent after Fatal
}
type
nak
struct
{
msg
*
Msg
why
string
}
}
// XXX place; just use t.Cleanup instead?
func
(
t
*
T
)
queuenak
(
msg
*
Msg
,
why
string
)
{
t
.
nakq
=
append
(
t
.
nakq
,
nak
{
msg
,
why
})
}
// XXX doc; naming
// XXX doc; naming
func
(
t
*
T
)
SetEventRouter
(
routeEvent
func
(
event
interface
{})
(
stream
string
))
{
func
(
t
*
T
)
SetEventRouter
(
routeEvent
func
(
event
interface
{})
(
stream
string
))
{
t
.
streamTabMu
.
Lock
()
t
.
streamTabMu
.
Lock
()
...
@@ -504,7 +528,7 @@ func (t *T) RxEvent(event interface{}) {
...
@@ -504,7 +528,7 @@ func (t *T) RxEvent(event interface{}) {
t
.
streamTabMu
.
Unlock
()
t
.
streamTabMu
.
Unlock
()
if
ch
==
nil
{
if
ch
==
nil
{
t
.
fatalfInNonMain
(
"test is no longer
operational
"
)
t
.
fatalfInNonMain
(
"test is no longer
running
"
)
}
}
// TODO it is possible to empirically detect here if a test incorrectly
// TODO it is possible to empirically detect here if a test incorrectly
...
@@ -535,7 +559,7 @@ func (t *T) RxEvent(event interface{}) {
...
@@ -535,7 +559,7 @@ func (t *T) RxEvent(event interface{}) {
// TODO timeout: deadlock? (print all-in-flight events on timeout)
// TODO timeout: deadlock? (print all-in-flight events on timeout)
// XXX or better ^^^ to do on receiver side?
// XXX or better ^^^ to do on receiver side?
-> yes - on receiver
ch
.
Send
(
event
)
ch
.
Send
(
event
)
}
}
...
@@ -591,8 +615,8 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg {
...
@@ -591,8 +615,8 @@ func (t *T) expect1(stream string, eventExpect interface{}) *Msg {
revent
:=
reventp
.
Elem
()
revent
:=
reventp
.
Elem
()
if
!
reflect
.
DeepEqual
(
revent
.
Interface
(),
reventExpect
.
Interface
())
{
if
!
reflect
.
DeepEqual
(
revent
.
Interface
(),
reventExpect
.
Interface
())
{
//msg.Nak("expect failed") // XXX merge into Fatal
t
.
queuenak
(
msg
,
"unexpected event data"
)
t
.
Fatalf
(
"%s: expect: %s:
\n
want: %v
\n
have: %v
\n
diff:
\n
%s"
,
t
.
Fatalf
(
"%s: expect: %s:
\n
want: %v
\n
have: %v
\n
diff:
\n
%s
\n
"
,
stream
,
stream
,
reventExpect
.
Type
(),
reventExpect
,
revent
,
reventExpect
.
Type
(),
reventExpect
,
revent
,
pretty
.
Compare
(
reventExpect
.
Interface
(),
revent
.
Interface
()))
pretty
.
Compare
(
reventExpect
.
Interface
(),
revent
.
Interface
()))
...
@@ -626,7 +650,7 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg {
...
@@ -626,7 +650,7 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg {
reventp
:=
reflect
.
ValueOf
(
eventp
)
reventp
:=
reflect
.
ValueOf
(
eventp
)
if
reventp
.
Type
()
.
Elem
()
!=
reflect
.
TypeOf
(
msg
.
Event
)
{
if
reventp
.
Type
()
.
Elem
()
!=
reflect
.
TypeOf
(
msg
.
Event
)
{
msg
.
nak
(
"test failed"
)
// XXX message ok?
t
.
queuenak
(
msg
,
"unexpected event type"
)
t
.
Fatalf
(
"%s: expect: %s: got %#v"
,
ch
.
name
(),
reventp
.
Elem
()
.
Type
(),
msg
.
Event
)
t
.
Fatalf
(
"%s: expect: %s: got %#v"
,
ch
.
name
(),
reventp
.
Elem
()
.
Type
(),
msg
.
Event
)
}
}
...
@@ -636,6 +660,7 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg {
...
@@ -636,6 +660,7 @@ func (t *T) xget1(stream string, eventp interface{}) *Msg {
return
msg
return
msg
}
}
/*
// deadlock reports diagnostic when retrieving event from stream times out.
// deadlock reports diagnostic when retrieving event from stream times out.
//
//
// Timing out on recv means there is a deadlock either if no event was sent at
// Timing out on recv means there is a deadlock either if no event was sent at
...
@@ -694,6 +719,7 @@ func (t *T) deadlock(stream string, eventp interface{}) {
...
@@ -694,6 +719,7 @@ func (t *T) deadlock(stream string, eventp interface{}) {
}
}
t.FailNow()
t.FailNow()
}
}
*/
// fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic.
// fatalfInNonMain should be called for fatal cases in non-main goroutines instead of panic.
...
@@ -730,7 +756,7 @@ func (t *T) FailNow() {
...
@@ -730,7 +756,7 @@ func (t *T) FailNow() {
type
sendInfo
struct
{
ch
Chan
;
msg
*
Msg
}
type
sendInfo
struct
{
ch
Chan
;
msg
*
Msg
}
var
sendv
[]
sendInfo
var
sendv
[]
sendInfo
for
_
,
ch
:=
range
streamTab
{
for
_
,
ch
:=
range
streamTab
{
// check whether someone is sending on
a dst
without blocking.
// check whether someone is sending on
channels
without blocking.
// if yes - report to sender there is a problem - so it can cancel its task.
// if yes - report to sender there is a problem - so it can cancel its task.
select
{
select
{
case
msg
:=
<-
ch
.
_rxq
()
:
case
msg
:=
<-
ch
.
_rxq
()
:
...
@@ -759,8 +785,11 @@ func (t *T) FailNow() {
...
@@ -759,8 +785,11 @@ func (t *T) FailNow() {
// nak them only after deadlock printout, so that the deadlock text
// nak them only after deadlock printout, so that the deadlock text
// comes first, and their "panics" don't get intermixed with it.
// comes first, and their "panics" don't get intermixed with it.
t
.
Log
(
bad
)
t
.
Log
(
bad
)
for
_
,
__
:=
range
t
.
nakq
{
__
.
msg
.
nak
(
__
.
why
)
}
for
_
,
__
:=
range
sendv
{
for
_
,
__
:=
range
sendv
{
__
.
msg
.
nak
(
"
XXX deadlock
"
)
// XXX reason can be custom / different
__
.
msg
.
nak
(
"
test failed
"
)
// XXX reason can be custom / different
}
}
// in any case close channel where future Sends may arrive so that will "panic" too.
// in any case close channel where future Sends may arrive so that will "panic" too.
for
_
,
ch
:=
range
streamTab
{
for
_
,
ch
:=
range
streamTab
{
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment