Commit 6e312d66 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 28666738
......@@ -108,7 +108,7 @@ type Chan struct {
// The goroutine which sent the message will wait for Ack before continue.
type Msg struct {
Event interface {}
ack chan<- bool
ack chan<- bool // XXX -> chan<- error and abort tested goroutine if error?
}
......@@ -123,7 +123,7 @@ func (ch *Chan) Send(event interface{}) {
ch.msgq <- &Msg{event, ack}
ok := <-ack
if !ok {
panic(fmt.Sprintf("%s: send: deadlock", ch.name))
panicf("%s: send: deadlock", ch.name)
}
}
......@@ -170,7 +170,6 @@ func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in *Chan) *EventCh
// get1 gets 1 event in place and checks it has expected type
//
// if checks do not pass - fatal testing error is raised
// XXX why eventp, not just event here?
func (evc *EventChecker) xget1(eventp interface{}) *Msg {
evc.t.Helper()
var msg *Msg
......@@ -180,11 +179,12 @@ func (evc *EventChecker) xget1(eventp interface{}) *Msg {
// ok
case <-time.After(2*time.Second): // XXX timeout hardcoded
evc.deadlock(eventp)
evc.deadlock(eventp) // XXX -> Dispatcher + allow to tune?
}
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
// msg.ack <- error
evc.t.Fatalf("%s: expect: %s: got %#v", evc.in.name, reventp.Elem().Type(), msg.Event)
}
......@@ -207,6 +207,7 @@ func (evc *EventChecker) expect1(eventExpect interface{}) *Msg {
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
// msg.ack <- error
evc.t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff: %s",
evc.in.name,
reventExpect.Type(), reventExpect, revent,
......@@ -242,22 +243,19 @@ func (evc *EventChecker) ExpectNoACK(expected interface{}) *Msg {
}
// deadlock reports diagnostic when retrieving event from .in timed out.
// deadlock reports diagnostic when retrieving event from .in times out.
//
// timing out on recv means there is a deadlock either if no event was sent at
// Timing out on recv means there is a deadlock either if no event was sent at
// all, or some other event was sent to another channel/checker.
//
// report the full picture - what was expected and what was sent where.
// Report the full picture - what was expected and what was sent where.
func (evc *EventChecker) deadlock(eventp interface{}) {
evc.t.Helper()
rt := evc.dispatch.rt
dstv := rt.AllRoutes()
bad := fmt.Sprintf("%s: deadlock waiting for %T\n", evc.in.name, eventp)
type sendInfo struct{dst *Chan; event interface{}}
var sendv []sendInfo
for _, dst := range dstv {
for _, dst := range evc.dispatch.streams {
// check whether someone is sending on a dst without blocking.
// if yes - report to sender there is a problem - so it can cancel its task.
select {
......@@ -295,6 +293,8 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
// ----------------------------------------
// XXX -> Streams
/*
// EventRouter is the interface used for routing events to appropriate output Chan.
//
// It should be safe to use EventRouter from multiple goroutines simultaneously.
......@@ -305,24 +305,61 @@ type EventRouter interface {
// AllRoutes should return all routing destinations.
AllRoutes() []*Chan
}
*/
// XXX -> T
type T = EventDispatcher
// EventDispatcher dispatches events to appropriate Chan for checking
// according to provided router.
// according to provided streams.
type EventDispatcher struct {
rt EventRouter
streams map[/*name*/string]*Chan
routeEvent func(event interface{}) *Chan
}
// NewEventDispatcher creates new dispatcher and provides router to it.
func NewEventDispatcher(router EventRouter) *EventDispatcher {
return &EventDispatcher{rt: router}
func NewEventDispatcher(streams Streams) *EventDispatcher {
// build streams as dict and make sure all streams are named uniquely
streamDict := make(map[string]*Chan)
for _, ch := range streams.AllStreams() {
_, already := streamDict[ch.name]
if already {
panicf("duplicate stream name %q", ch.name)
}
streamDict[ch.name] = ch
}
return &EventDispatcher{
streams: streamDict,
routeEvent: streams.RouteEvent,
}
}
// XXX -> tracetest.T ?
// Dispatch dispatches event to appropriate output channel.
//
// It is safe to use Dispatch from multiple goroutines simultaneously.
func (d *EventDispatcher) Dispatch(event interface{}) {
outch := d.rt.Route(event)
// XXX if nil?
ch := d.routeEvent(event)
// assert that ch is within AllStreams() return.
if ch == nil {
panicf("Event %#v routed to nil", event)
}
/* XXX deactivated for now because neo.tNewCluster creates NewEventDispatcher with initially empty router
ch_, ok := d.streams[ch.name]
if !ok {
all := []string{}
for name := range d.streams {
all = append(all, name)
}
sort.Slice(all, func(i, j int) bool {
return strings.Compare(all[i], all[j]) < 0
})
panicf("Event %#v routed to stream %q that is outside of AllStreams %q", event, ch.name, all)
}
if ch != ch_ {
panicf("Event %#v routed to stream %q, but channel is different from what AllSteams[%q] originally reported", event, ch.name, ch.name)
}
*/
// TODO it is possible to emperically detect here if a test incorrectly
// decomposed its system into serial streams: consider unrelated to each
......@@ -356,5 +393,45 @@ func (d *EventDispatcher) Dispatch(event interface{}) {
//
// XXX -> if deadlock detection is done on receiver side (so in
// EventChecker) -> we don't need EventDispatcher at all?
outch.Send(event)
ch.Send(event)
}
// Streams represents decomposition for events in a system into serial streams.
type Streams interface {
// AllStreams should return all streams of the system.
// Streams must have unique names.
AllStreams() []*Chan
// RouteEvent should return particular stream into which event should be routed.
// Returned stream must be one of reported by AllStreams.
RouteEvent(event interface{}) *Chan
}
// TODO func Verify(system)
// system() ->:
// (Streams)
// - []Chan where events will be emitted
// - Route(event) -> Chan which event goes to which event stream (= channel)
//
// - main func to run the system
// - testf (serial) func to verify events that comes from channels
// Verify verifies a system.
// XXX
func Verify(system func() (
Streams,
/*main*/ func(),
/*testf*/ func(t *T),
)) {
// XXX
}
// ---- misc ----
func panicf(format string, argv ...interface{}) {
panic(fmt.Sprintf(format, argv...))
}
......@@ -52,7 +52,14 @@ type tCluster struct {
tabMu sync.Mutex
nodeTab map[string/*node*/]*tNode
checkTab map[string/*node*/]*tracetest.EventChecker
// checkTab keeps event checkers for the following event streams:
// 1) events specific to a node, and
// 2) events specific to exchange in between node1-node2 for
// communications that originate at node1.
//
// NOTE that node1-node2 and node2-node1 are different and come with
// separate event checkers.
checkTab map[string/*node, or node1->node2*/]*tracetest.EventChecker
}
// tNode represents information about a test node ... XXX
......@@ -124,7 +131,7 @@ func (t *tCluster) Checker(name string) *tracetest.EventChecker {
c, ok := t.checkTab[name]
if !ok {
panic(fmt.Sprintf("test cluster: no %q checker", name))
panic(fmt.Sprintf("test cluster: no checker for %q", name))
}
return c
......@@ -177,7 +184,7 @@ func (t *tCluster) registerNewNode(name string) *tNode {
// The master will be accepting incoming connections at node:1.
// The node must be not yet existing and will be dedicated to the created master fully. XXX
//
// XXX error of creating py process?
// XXX error of creating py process? -> t.Fatal
func (t *tCluster) NewMaster(name string) ITestMaster {
node := t.registerNewNode(name)
m := tNewMaster(t.name, ":1", node.net)
......
......@@ -67,7 +67,7 @@ type eventNeoSend struct {
// event: cluster state changed
type eventClusterState struct {
//Ptr *neo.ClusterState // pointer to variable which holds the state
Where string
Where string // XXX document what Where means - (host name of a node ?)
State proto.ClusterState
}
......@@ -157,6 +157,7 @@ func δnode(where string, laddr string, typ proto.NodeType, num int32, state pro
type EventRouter struct {
mu sync.Mutex
// XXX all unbranched events go to here?
defaultq *tracetest.Chan
// events specific to particular node - e.g. node starts listening,
......@@ -181,6 +182,16 @@ type EventRouter struct {
connected map[string /*addr-addr*/]bool
}
// linkDst represents destination for events on a network link.
//
// Events go to either a or b depending on which side initiated particular
// connection on top of the link.
type linkDst struct {
a *tracetest.Chan // net cause was on dialer
b *tracetest.Chan // net cause was on listener
}
func NewEventRouter() *EventRouter {
return &EventRouter{
defaultq: tracetest.NewChan("default"),
......@@ -191,7 +202,7 @@ func NewEventRouter() *EventRouter {
}
}
func (r *EventRouter) AllRoutes() []*tracetest.Chan {
func (r *EventRouter) AllStreams() []*tracetest.Chan {
rtset := map[*tracetest.Chan]int{}
rtset[r.defaultq] = 1
for _, dst := range r.byNode {
......@@ -234,7 +245,7 @@ func host(addr string) string {
}
// Route routes events according to rules specified via Branch*().
func (r *EventRouter) Route(event interface{}) (dst *tracetest.Chan) {
func (r *EventRouter) RouteEvent(event interface{}) (dst *tracetest.Chan) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -376,13 +387,3 @@ func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.Chan) {
r.byLink[link] = &linkDst{dsta, dstb}
}
// linkDst represents destination for events on a network link.
//
// Events go to either a or b depending on which side initiated particular
// connection on top of the link.
type linkDst struct {
a *tracetest.Chan // net cause was on dialer
b *tracetest.Chan // net cause was on listener
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment