Commit 454b6576 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6e312d66
...@@ -89,7 +89,10 @@ import ( ...@@ -89,7 +89,10 @@ import (
"github.com/kylelemons/godebug/pretty" "github.com/kylelemons/godebug/pretty"
) )
var chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels") var (
chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels")
deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock")
)
// Chan provides synchronous channel with additional property that send // Chan provides synchronous channel with additional property that send
// blocks until receiving side explicitly acknowledges message was received and // blocks until receiving side explicitly acknowledges message was received and
...@@ -98,9 +101,31 @@ var chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are ...@@ -98,9 +101,31 @@ var chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are
// New channels must be created via NewChan. // New channels must be created via NewChan.
// //
// It is safe to use Chan from multiple goroutines simultaneously. // It is safe to use Chan from multiple goroutines simultaneously.
type Chan struct { type Chan interface {
msgq chan *Msg ChanTx
name string ChanRx
name() string // name of the channel
}
// ChanTx represents "send-only" half of Chan.
// It is similar to chan<- .
type ChanTx interface {
// Send sends event to a consumer and waits for ack.
// if main testing goroutine detects any problem Send panics. XXX
Send(event interface{})
}
// ChanRx represents "receive-only" half of Chan.
// It is similar to <-chan .
type ChanRx interface {
// Recv receives message from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
Recv() *Msg
// _rxq returns raw channel that is serving ChanRx.
// it is used internally to use ChanRx in select.
_rxq() <-chan *Msg
} }
// Msg represents message with 1 event sent over Chan. // Msg represents message with 1 event sent over Chan.
...@@ -112,58 +137,74 @@ type Msg struct { ...@@ -112,58 +137,74 @@ type Msg struct {
} }
// Send sends event to a consumer and waits for ack.
// // _chan implements Chan.
// if main testing goroutine detects any problem Send panics. XXX type _chan struct {
func (ch *Chan) Send(event interface{}) { msgq chan *Msg
_name string
}
// name implements Chan.
func (ch *_chan) name() string {
return ch._name
}
// Send implements ChanTx.
func (ch *_chan) Send(event interface{}) {
if *chatty { if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name, event, event) fmt.Printf("%s <- %T %v\n", ch.name(), event, event)
} }
ack := make(chan bool) ack := make(chan bool)
ch.msgq <- &Msg{event, ack} ch.msgq <- &Msg{event, ack}
ok := <-ack ok := <-ack
if !ok { if !ok {
panicf("%s: send: deadlock", ch.name) panicf("%s: send: deadlock", ch.name())
} }
} }
// Recv receives message from a producer. // Recv implements ChanRx.
// func (ch *_chan) Recv() *Msg {
// The consumer, after dealing with the message, must send back an ack.
func (ch *Chan) Recv() *Msg {
msg := <-ch.msgq msg := <-ch.msgq
return msg return msg
} }
// _rxq implements ChanRx.
func (ch *_chan) _rxq() <-chan *Msg {
return ch.msgq
}
// XXX -> Unpause?
// Ack acknowledges the event was processed and unblocks producer goroutine. // Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *Msg) Ack() { func (m *Msg) Ack() {
m.ack <- true m.ack <- true
} }
// NewChan creates new Chan channel. // NewChan creates new Chan channel.
func NewChan(name string) *Chan { func NewChan(name string) Chan {
// XXX somehow avoid channels with duplicate names // XXX somehow avoid channels with duplicate names
// (only allow to create named channels from under dispatcher?) // (only allow to create named channels from under dispatcher?)
return &Chan{msgq: make(chan *Msg), name: name} return &_chan{msgq: make(chan *Msg), _name: name}
} }
// ---------------------------------------- // ----------------------------------------
// XXX -> tracetest.TSerial ? tracetest.TSeq?
// EventChecker is testing utility to verify that sequence of events coming // EventChecker is testing utility to verify that sequence of events coming
// from a single Chan is as expected. // from a single Chan is as expected.
type EventChecker struct { type EventChecker struct {
t testing.TB t testing.TB
in *Chan in Chan // XXX -> ChanRx ?
dispatch *EventDispatcher dispatch *EventDispatcher // XXX why here?
} }
// NewEventChecker constructs new EventChecker that will retrieve events from // NewEventChecker constructs new EventChecker that will retrieve events from
// `in` and use `t` for tests reporting. // `in` and use `t` for tests reporting.
// //
// XXX -> dispatch.NewChecker() ? // XXX -> dispatch.NewChecker() ?
func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in *Chan) *EventChecker { // XXX kill dispatch arg in NewEventChecker (and set it only from dispatch.NewChecker)
func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in Chan) *EventChecker {
return &EventChecker{t: t, in: in, dispatch: dispatch} return &EventChecker{t: t, in: in, dispatch: dispatch}
} }
...@@ -175,17 +216,17 @@ func (evc *EventChecker) xget1(eventp interface{}) *Msg { ...@@ -175,17 +216,17 @@ func (evc *EventChecker) xget1(eventp interface{}) *Msg {
var msg *Msg var msg *Msg
select { select {
case msg = <-evc.in.msgq: // unwrapped Recv case msg = <-evc.in._rxq(): // unwrapped Recv
// ok // ok
case <-time.After(2*time.Second): // XXX timeout hardcoded case <-time.After(*deadTime):
evc.deadlock(eventp) // XXX -> Dispatcher + allow to tune? evc.deadlock(eventp) // XXX -> Dispatcher + allow to tune?
} }
reventp := reflect.ValueOf(eventp) reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) { if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
// msg.ack <- error // msg.ack <- error
evc.t.Fatalf("%s: expect: %s: got %#v", evc.in.name, reventp.Elem().Type(), msg.Event) evc.t.Fatalf("%s: expect: %s: got %#v", evc.in.name(), reventp.Elem().Type(), msg.Event)
} }
// *eventp = msg.Event // *eventp = msg.Event
...@@ -209,7 +250,7 @@ func (evc *EventChecker) expect1(eventExpect interface{}) *Msg { ...@@ -209,7 +250,7 @@ func (evc *EventChecker) expect1(eventExpect interface{}) *Msg {
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) { if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
// msg.ack <- error // msg.ack <- error
evc.t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff: %s", evc.t.Fatalf("%s: expect: %s:\nwant: %v\nhave: %v\ndiff: %s",
evc.in.name, evc.in.name(),
reventExpect.Type(), reventExpect, revent, reventExpect.Type(), reventExpect, revent,
pretty.Compare(reventExpect.Interface(), revent.Interface())) pretty.Compare(reventExpect.Interface(), revent.Interface()))
} }
...@@ -252,14 +293,14 @@ func (evc *EventChecker) ExpectNoACK(expected interface{}) *Msg { ...@@ -252,14 +293,14 @@ func (evc *EventChecker) ExpectNoACK(expected interface{}) *Msg {
func (evc *EventChecker) deadlock(eventp interface{}) { func (evc *EventChecker) deadlock(eventp interface{}) {
evc.t.Helper() evc.t.Helper()
bad := fmt.Sprintf("%s: deadlock waiting for %T\n", evc.in.name, eventp) bad := fmt.Sprintf("%s: deadlock waiting for %T\n", evc.in.name(), eventp)
type sendInfo struct{dst *Chan; event interface{}} type sendInfo struct{dst Chan; event interface{}}
var sendv []sendInfo var sendv []sendInfo
for _, dst := range evc.dispatch.streams { for _, dst := range evc.dispatch.streams {
// check whether someone is sending on a dst without blocking. // check whether someone is sending on a dst without blocking.
// if yes - report to sender there is a problem - so it can cancel its task. // if yes - report to sender there is a problem - so it can cancel its task.
select { select {
case msg := <-dst.msgq: case msg := <-dst._rxq():
sendv = append(sendv, sendInfo{dst, msg.Event}) sendv = append(sendv, sendInfo{dst, msg.Event})
//msg.ack <- false //msg.ack <- false
...@@ -269,13 +310,13 @@ func (evc *EventChecker) deadlock(eventp interface{}) { ...@@ -269,13 +310,13 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
// XXX panic triggering disabled because if sender panics we have no chance to continue // XXX panic triggering disabled because if sender panics we have no chance to continue
// TODO retest this // TODO retest this
// in any case close channel where futer Sends may arrive so that will panic too. // in any case close channel where future Sends may arrive so that will panic too.
//close(dst.msgq) //close(dst.msgq)
} }
// order channels by name // order channels by name
sort.Slice(sendv, func(i, j int) bool { sort.Slice(sendv, func(i, j int) bool {
return strings.Compare(sendv[i].dst.name, sendv[j].dst.name) < 0 return strings.Compare(sendv[i].dst.name(), sendv[j].dst.name()) < 0
}) })
if len(sendv) == 0 { if len(sendv) == 0 {
...@@ -283,7 +324,7 @@ func (evc *EventChecker) deadlock(eventp interface{}) { ...@@ -283,7 +324,7 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
} else { } else {
bad += fmt.Sprintf("there are %d sender(s) on other channel(s):\n", len(sendv)) bad += fmt.Sprintf("there are %d sender(s) on other channel(s):\n", len(sendv))
for _, __ := range sendv { for _, __ := range sendv {
bad += fmt.Sprintf("%s:\t%T %v\n", __.dst.name, __.event, __.event) bad += fmt.Sprintf("%s:\t%T %v\n", __.dst.name(), __.event, __.event)
} }
} }
...@@ -312,20 +353,21 @@ type T = EventDispatcher ...@@ -312,20 +353,21 @@ type T = EventDispatcher
// EventDispatcher dispatches events to appropriate Chan for checking // EventDispatcher dispatches events to appropriate Chan for checking
// according to provided streams. // according to provided streams.
type EventDispatcher struct { type EventDispatcher struct {
streams map[/*name*/string]*Chan streams map[/*name*/string]Chan // XXX -> ChanTx? just str
routeEvent func(event interface{}) *Chan routeEvent func(event interface{}) Chan // XXX ----//----
} }
// NewEventDispatcher creates new dispatcher and provides router to it. // NewEventDispatcher creates new dispatcher and provides router to it.
func NewEventDispatcher(streams Streams) *EventDispatcher { func NewEventDispatcher(streams Streams) *EventDispatcher {
// build streams as dict and make sure all streams are named uniquely // build streams as dict and make sure all streams are named uniquely
streamDict := make(map[string]*Chan) streamDict := make(map[string]Chan)
for _, ch := range streams.AllStreams() { for _, ch := range streams.AllStreams() {
_, already := streamDict[ch.name] name := ch.name()
_, already := streamDict[name]
if already { if already {
panicf("duplicate stream name %q", ch.name) panicf("duplicate stream name %q", name)
} }
streamDict[ch.name] = ch streamDict[name] = ch
} }
return &EventDispatcher{ return &EventDispatcher{
streams: streamDict, streams: streamDict,
...@@ -361,10 +403,10 @@ func (d *EventDispatcher) Dispatch(event interface{}) { ...@@ -361,10 +403,10 @@ func (d *EventDispatcher) Dispatch(event interface{}) {
} }
*/ */
// TODO it is possible to emperically detect here if a test incorrectly // TODO it is possible to empirically detect here if a test incorrectly
// decomposed its system into serial streams: consider unrelated to each // decomposed its system into serial streams: consider unrelated to each
// other events A and B are incorrectly routed to the same channel. It // other events A and B are incorrectly routed to the same channel. It
// could be so happenning that the order of checks on the test side is // could be so happening that the order of checks on the test side is
// almost always correct and so the error is not visible. However // almost always correct and so the error is not visible. However
// //
// if we add delays to delivery of either A or B // if we add delays to delivery of either A or B
...@@ -388,7 +430,7 @@ func (d *EventDispatcher) Dispatch(event interface{}) { ...@@ -388,7 +430,7 @@ func (d *EventDispatcher) Dispatch(event interface{}) {
// http://www.1024cores.net/home/relacy-race-detector/rrd-introduction // http://www.1024cores.net/home/relacy-race-detector/rrd-introduction
// TODO timeout: deadlock? (print all-in-flight events on timout) // TODO timeout: deadlock? (print all-in-flight events on timeout)
// XXX or better ^^^ to do on receiver side? // XXX or better ^^^ to do on receiver side?
// //
// XXX -> if deadlock detection is done on receiver side (so in // XXX -> if deadlock detection is done on receiver side (so in
...@@ -401,11 +443,11 @@ func (d *EventDispatcher) Dispatch(event interface{}) { ...@@ -401,11 +443,11 @@ func (d *EventDispatcher) Dispatch(event interface{}) {
type Streams interface { type Streams interface {
// AllStreams should return all streams of the system. // AllStreams should return all streams of the system.
// Streams must have unique names. // Streams must have unique names.
AllStreams() []*Chan AllStreams() []Chan
// RouteEvent should return particular stream into which event should be routed. // RouteEvent should return particular stream into which event should be routed.
// Returned stream must be one of reported by AllStreams. // Returned stream must be one of reported by AllStreams.
RouteEvent(event interface{}) *Chan RouteEvent(event interface{}) Chan
} }
......
...@@ -428,7 +428,7 @@ func TestMasterStorage(t0 *testing.T) { ...@@ -428,7 +428,7 @@ func TestMasterStorage(t0 *testing.T) {
// XXX hack - better we don't need it. // XXX hack - better we don't need it.
// XXX -> with testenv.MkCluster() we won't need it // XXX -> with testenv.MkCluster() we won't need it
type tdispatch1 struct { type tdispatch1 struct {
outch *tracetest.Chan outch tracetest.Chan
} }
func (d tdispatch1) Dispatch(event interface{}) { func (d tdispatch1) Dispatch(event interface{}) {
......
...@@ -158,11 +158,11 @@ type EventRouter struct { ...@@ -158,11 +158,11 @@ type EventRouter struct {
mu sync.Mutex mu sync.Mutex
// XXX all unbranched events go to here? // XXX all unbranched events go to here?
defaultq *tracetest.Chan defaultq tracetest.Chan
// events specific to particular node - e.g. node starts listening, // events specific to particular node - e.g. node starts listening,
// state on that node changes, etc... // state on that node changes, etc...
byNode map[string /*host*/]*tracetest.Chan byNode map[string /*host*/]tracetest.Chan
// state on host changes. Takes precendece over byNode. // state on host changes. Takes precendece over byNode.
// //
...@@ -172,7 +172,7 @@ type EventRouter struct { ...@@ -172,7 +172,7 @@ type EventRouter struct {
// byNode("C") and simply verify events on tMC and then tC in that order. // byNode("C") and simply verify events on tMC and then tC in that order.
// keeping events local to C on tC, not tMC helps TestCluster to // keeping events local to C on tC, not tMC helps TestCluster to
// organize trace channels in uniform way ) // organize trace channels in uniform way )
byState map[string /*host*/]*tracetest.Chan byState map[string /*host*/]tracetest.Chan
// event on a-b link // event on a-b link
byLink map[string /*host-host*/]*linkDst byLink map[string /*host-host*/]*linkDst
...@@ -187,23 +187,23 @@ type EventRouter struct { ...@@ -187,23 +187,23 @@ type EventRouter struct {
// Events go to either a or b depending on which side initiated particular // Events go to either a or b depending on which side initiated particular
// connection on top of the link. // connection on top of the link.
type linkDst struct { type linkDst struct {
a *tracetest.Chan // net cause was on dialer a tracetest.Chan // net cause was on dialer
b *tracetest.Chan // net cause was on listener b tracetest.Chan // net cause was on listener
} }
func NewEventRouter() *EventRouter { func NewEventRouter() *EventRouter {
return &EventRouter{ return &EventRouter{
defaultq: tracetest.NewChan("default"), defaultq: tracetest.NewChan("default"),
byNode: make(map[string]*tracetest.Chan), byNode: make(map[string]tracetest.Chan),
byState: make(map[string]*tracetest.Chan), byState: make(map[string]tracetest.Chan),
byLink: make(map[string]*linkDst), byLink: make(map[string]*linkDst),
connected: make(map[string]bool), connected: make(map[string]bool),
} }
} }
func (r *EventRouter) AllStreams() []*tracetest.Chan { func (r *EventRouter) AllStreams() []tracetest.Chan {
rtset := map[*tracetest.Chan]int{} rtset := map[tracetest.Chan]int{}
rtset[r.defaultq] = 1 rtset[r.defaultq] = 1
for _, dst := range r.byNode { for _, dst := range r.byNode {
rtset[dst] = 1 rtset[dst] = 1
...@@ -216,7 +216,7 @@ func (r *EventRouter) AllStreams() []*tracetest.Chan { ...@@ -216,7 +216,7 @@ func (r *EventRouter) AllStreams() []*tracetest.Chan {
rtset[ldst.b] = 1 rtset[ldst.b] = 1
} }
var rtv []*tracetest.Chan var rtv []tracetest.Chan
for dst := range rtset { for dst := range rtset {
rtv = append(rtv, dst) rtv = append(rtv, dst)
} }
...@@ -245,7 +245,7 @@ func host(addr string) string { ...@@ -245,7 +245,7 @@ func host(addr string) string {
} }
// Route routes events according to rules specified via Branch*(). // Route routes events according to rules specified via Branch*().
func (r *EventRouter) RouteEvent(event interface{}) (dst *tracetest.Chan) { func (r *EventRouter) RouteEvent(event interface{}) (dst tracetest.Chan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -329,7 +329,7 @@ func (r *EventRouter) RouteEvent(event interface{}) (dst *tracetest.Chan) { ...@@ -329,7 +329,7 @@ func (r *EventRouter) RouteEvent(event interface{}) (dst *tracetest.Chan) {
} }
// routeState routes event corresponding to state change on host // routeState routes event corresponding to state change on host
func (r *EventRouter) routeState(host string) (dst *tracetest.Chan) { func (r *EventRouter) routeState(host string) (dst tracetest.Chan) {
// lookup dst by state rules // lookup dst by state rules
dst = r.byState[host] dst = r.byState[host]
if dst != nil { if dst != nil {
...@@ -341,7 +341,7 @@ func (r *EventRouter) routeState(host string) (dst *tracetest.Chan) { ...@@ -341,7 +341,7 @@ func (r *EventRouter) routeState(host string) (dst *tracetest.Chan) {
} }
// BranchNode branches events corresponding to host. // BranchNode branches events corresponding to host.
func (r *EventRouter) BranchNode(host string, dst *tracetest.Chan) { func (r *EventRouter) BranchNode(host string, dst tracetest.Chan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -355,7 +355,7 @@ func (r *EventRouter) BranchNode(host string, dst *tracetest.Chan) { ...@@ -355,7 +355,7 @@ func (r *EventRouter) BranchNode(host string, dst *tracetest.Chan) {
// BranchState branches events corresponding to state changes on host. // BranchState branches events corresponding to state changes on host.
// //
// XXX not needed? // XXX not needed?
func (r *EventRouter) BranchState(host string, dst *tracetest.Chan) { func (r *EventRouter) BranchState(host string, dst tracetest.Chan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
...@@ -375,7 +375,7 @@ func (r *EventRouter) BranchState(host string, dst *tracetest.Chan) { ...@@ -375,7 +375,7 @@ func (r *EventRouter) BranchState(host string, dst *tracetest.Chan) {
// networking cause root coming from b - go to dstb. // networking cause root coming from b - go to dstb.
// //
// XXX extend to support multiple ongoing streams (e.g. prefetch) ? // XXX extend to support multiple ongoing streams (e.g. prefetch) ?
func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.Chan) { func (r *EventRouter) BranchLink(link string, dsta, dstb tracetest.Chan) {
r.mu.Lock() r.mu.Lock()
defer r.mu.Unlock() defer r.mu.Unlock()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment