Commit 1ec3a91e authored by Kirill Smelkov's avatar Kirill Smelkov

. Good to go into 2021

parent 489dab67
...@@ -214,276 +214,6 @@ func _NewChan(name string) _Chan { ...@@ -214,276 +214,6 @@ func _NewChan(name string) _Chan {
// ---------------------------------------- // ----------------------------------------
/*
// XXX -> tracetest.TSerial ? tracetest.TSeq?
// EventChecker is testing utility to verify that sequence of events coming
// from a single Chan is as expected.
type EventChecker struct {
t testing.TB
in Chan // XXX -> ChanRx ?
dispatch *EventDispatcher // XXX why here?
}
// NewEventChecker constructs new EventChecker that will retrieve events from
// `in` and use `t` for tests reporting.
//
// XXX -> dispatch.NewChecker() ?
// XXX kill dispatch arg in NewEventChecker (and set it only from dispatch.NewChecker)
func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in Chan) *EventChecker {
return &EventChecker{t: t, in: in, dispatch: dispatch}
}
// get1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
func (evc *EventChecker) xget1(eventp interface{}) *Msg {
evc.t.Helper()
var msg *Msg
select {
case msg = <-evc.in._rxq(): // unwrapped Recv
// ok
case <-time.After(*deadTime):
evc.deadlock(eventp) // XXX -> Dispatcher + allow to tune?
}
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
// msg.ack <- error
evc.t.Fatalf("%s: expect: %s: got %#v", evc.in.name(), reventp.Elem().Type(), msg.Event)
}
// *eventp = msg.Event
reventp.Elem().Set(reflect.ValueOf(msg.Event))
return msg
}
// expect1 asks checker to expect next event to be eventExpect (both type and value)
//
// if checks do not pass - fatal testing error is raised.
func (evc *EventChecker) expect1(eventExpect interface{}) *Msg {
evc.t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
reventp := reflect.New(reventExpect.Type())
msg := evc.xget1(reventp.Interface())
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
// msg.ack <- error
evc.t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff: %s\n",
evc.in.name(),
reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface()))
}
return msg
}
// Expect asks checker to receive next event and verify it to be equal to expected.
//
// If check is successful ACK is sent back to event producer.
// If check does not pass - fatal testing error is raised.
func (evc *EventChecker) Expect(expected interface{}) {
evc.t.Helper()
msg := evc.expect1(expected)
msg.Ack()
}
// ExpectNoACK asks checker to receive next event and verify it to be equal to
// expected without sending back ACK.
//
// No ACK is sent back to event producer - the caller becomes responsible to
// send ACK back by itself.
//
// If check does not pass - fatal testing error is raised.
func (evc *EventChecker) ExpectNoACK(expected interface{}) *Msg {
evc.t.Helper()
msg := evc.expect1(expected)
return msg
}
// deadlock reports diagnostic when retrieving event from .in times out.
//
// Timing out on recv means there is a deadlock either if no event was sent at
// all, or some other event was sent to another channel/checker.
//
// Report the full picture - what was expected and what was sent where.
func (evc *EventChecker) deadlock(eventp interface{}) {
evc.t.Helper()
bad := fmt.Sprintf("%s: deadlock waiting for %T\n", evc.in.name(), eventp)
type sendInfo struct{dst Chan; event interface{}}
var sendv []sendInfo
for _, dst := range evc.dispatch.streams {
// check whether someone is sending on a dst without blocking.
// if yes - report to sender there is a problem - so it can cancel its task.
select {
case msg := <-dst._rxq():
sendv = append(sendv, sendInfo{dst, msg.Event})
//msg.ack <- false
default:
}
// XXX panic triggering disabled because if sender panics we have no chance to continue
// TODO retest this
// in any case close channel where future Sends may arrive so that will panic too.
//close(dst.msgq)
}
// order channels by name
sort.Slice(sendv, func(i, j int) bool {
return strings.Compare(sendv[i].dst.name(), sendv[j].dst.name()) < 0
})
if len(sendv) == 0 {
bad += fmt.Sprintf("noone is sending\n")
} else {
bad += fmt.Sprintf("there are %d pending sender(s) on other channel(s):\n", len(sendv))
for _, __ := range sendv {
bad += fmt.Sprintf("%s:\t%T %v\n", __.dst.name(), __.event, __.event)
}
}
evc.t.Fatal(bad)
}
*/
// ----------------------------------------
// XXX -> Streams
/*
// EventRouter is the interface used for routing events to appropriate output Chan.
//
// It should be safe to use EventRouter from multiple goroutines simultaneously.
type EventRouter interface {
// Route should return appropriate destination for event.
Route(event interface{}) *Chan
// AllRoutes should return all routing destinations.
AllRoutes() []*Chan
}
*/
// // EventDispatcher dispatches events to appropriate Chan for checking
// // according to provided streams.
// type EventDispatcher struct {
// streams map[/*name*/string]Chan // XXX -> ChanTx? just str
// routeEvent func(event interface{}) Chan // XXX ----//----
// }
/*
// NewEventDispatcher creates new dispatcher and provides router to it.
func NewEventDispatcher(streams Streams) *EventDispatcher {
// build streams as dict and make sure all streams are named uniquely
streamDict := make(map[string]Chan)
for _, ch := range streams.AllStreams() {
name := ch.name()
_, already := streamDict[name]
if already {
panicf("duplicate stream name %q", name)
}
streamDict[name] = ch
}
return &EventDispatcher{
streams: streamDict,
routeEvent: streams.RouteEvent,
}
}
*/
// // XXX -> tracetest.T ?
// // Dispatch dispatches event to appropriate output channel.
// //
// // It is safe to use Dispatch from multiple goroutines simultaneously.
// func (d *EventDispatcher) Dispatch(event interface{}) {
// ch := d.routeEvent(event)
//
// // assert that ch is within AllStreams() return.
// if ch == nil {
// panicf("Event %#v routed to nil", event)
// }
// /* XXX deactivated for now because neo.tNewCluster creates NewEventDispatcher with initially empty router
// ch_, ok := d.streams[ch.name]
// if !ok {
// all := []string{}
// for name := range d.streams {
// all = append(all, name)
// }
// sort.Slice(all, func(i, j int) bool {
// return strings.Compare(all[i], all[j]) < 0
// })
// panicf("Event %#v routed to stream %q that is outside of AllStreams %q", event, ch.name, all)
// }
// if ch != ch_ {
// panicf("Event %#v routed to stream %q, but channel is different from what AllSteams[%q] originally reported", event, ch.name, ch.name)
// }
// */
//
// // TODO it is possible to empirically detect here if a test incorrectly
// // decomposed its system into serial streams: consider unrelated to each
// // other events A and B are incorrectly routed to the same channel. It
// // could be so happening that the order of checks on the test side is
// // almost always correct and so the error is not visible. However
// //
// // if we add delays to delivery of either A or B
// // and test both combinations
// //
// // we will for sure detect the error as, if A and B are indeed
// // unrelated, one of the delay combination will result in events
// // delivered to test in different to what it expects order.
// //
// // the time for delay could be taken as follows:
// //
// // - run the test without delay; collect δt between events on particular stream
// // - take delay = max(δt)·10
// //
// // to make sure there is indeed no different orderings possible on the
// // stream, rerun the test N(event-on-stream) times, and during i'th run
// // delay i'th event.
// //
// // TODO See also on this topic:
// // http://www.1024cores.net/home/relacy-race-detector
// // http://www.1024cores.net/home/relacy-race-detector/rrd-introduction
//
//
// // TODO timeout: deadlock? (print all-in-flight events on timeout)
// // XXX or better ^^^ to do on receiver side?
// //
// // XXX -> if deadlock detection is done on receiver side (so in
// // EventChecker) -> we don't need EventDispatcher at all?
// ch.Send(event)
// }
/*
// Streams represents decomposition for events in a system into serial streams.
type Streams interface {
// AllStreams should return all streams of the system.
// Streams must have unique names.
AllStreams() []Chan
// RouteEvent should return particular stream into which event should be routed.
// Returned stream must be one of reported by AllStreams.
RouteEvent(event interface{}) Chan
}
*/
// ------ vvv = ok ------
// _testing_TB is alias for testing.TB that is non-public when embedded into a struct. // _testing_TB is alias for testing.TB that is non-public when embedded into a struct.
type _testing_TB = testing.TB type _testing_TB = testing.TB
...@@ -799,15 +529,6 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -799,15 +529,6 @@ func (t *T) closeStreamTab() (nnak int) {
// TODO func Verify(system)
// system() ->:
// (Streams)
// - []Chan where events will be emitted
// - Route(event) -> Chan which event goes to which event stream (= channel)
//
// - main func to run the system
// - testf (serial) func to verify events that comes from channels
// Run runs f under tracing. // Run runs f under tracing.
// //
// It is similar to Verify but f is ran only once. // It is similar to Verify but f is ran only once.
......
...@@ -201,19 +201,6 @@ func (c *tStreamChecker) Expect(event interface{}) { ...@@ -201,19 +201,6 @@ func (c *tStreamChecker) Expect(event interface{}) {
c.t.Helper() c.t.Helper()
c.t.Expect(c.stream, event) c.t.Expect(c.stream, event)
} }
/*
func (t *tCluster) Checker(name string) *tracetest.EventChecker {
t.tabMu.Lock()
defer t.tabMu.Unlock()
c, ok := t.checkTab[name]
if !ok {
panic(fmt.Sprintf("test cluster: no checker for %q", name))
}
return c
}
*/
// registerNewNode registers new node with given name. // registerNewNode registers new node with given name.
// //
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment