Commit cc4f3f2c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 1bca75a7
...@@ -94,44 +94,7 @@ var ( ...@@ -94,44 +94,7 @@ var (
deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock") deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock")
) )
// _Chan provides synchronous channel with additional property that send // _Msg represents message with 1 event sent over _chan.
// blocks until receiving side explicitly acknowledges message was received and
// processed.
//
// New channels must be created via _NewChan.
//
// It is safe to use _Chan from multiple goroutines simultaneously.
type _Chan interface {
_ChanTx
_ChanRx
name() string // name of the channel
}
// _ChanTx represents "send-only" half of _Chan.
// It is similar to chan<- .
type _ChanTx interface {
// Send sends event to a consumer and waits for ack.
// if main testing goroutine detects any problem Send panics.
Send(event interface{})
// Close closes the sending side of the channel.
Close()
}
// _ChanRx represents "receive-only" half of _Chan.
// It is similar to <-chan .
type _ChanRx interface {
// Recv receives message from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
Recv() *_Msg
// _rxq returns raw channel that is serving _ChanRx.
// it is used internally to use _ChanRx in select.
_rxq() <-chan *_Msg
}
// _Msg represents message with 1 event sent over _Chan.
// //
// The goroutine which sent the message will wait for Ack before continue. // The goroutine which sent the message will wait for Ack before continue.
type _Msg struct { type _Msg struct {
...@@ -141,69 +104,69 @@ type _Msg struct { ...@@ -141,69 +104,69 @@ type _Msg struct {
// _chan implements _Chan. // _chan provides synchronous channel associated with a stream.
//
// It comes with additional property that send blocks until receiving side
// explicitly acknowledges message was received and processed.
//
// New channels must be created via T.newChan.
//
// It is safe to use _chan from multiple goroutines simultaneously.
type _chan struct { type _chan struct {
t *T t *T // created for stream <.name> under <.t>
name string // name of the channel/stream
msgq chan *_Msg msgq chan *_Msg
down chan struct{} // becomes ready when closed down chan struct{} // becomes ready when closed
_name string
} }
// name implements _Chan. // Send sends event to a consumer and waits for ack.
func (ch *_chan) name() string { // if main testing goroutine detects any problem Send panics.
return ch._name
}
// Send implements _ChanTx.
func (ch *_chan) Send(event interface{}) { func (ch *_chan) Send(event interface{}) {
if *chatty { if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name(), event, event) fmt.Printf("%s <- %T %v\n", ch.name, event, event)
} }
ack := make(chan error) ack := make(chan error)
select { select {
case <-ch.down: case <-ch.down:
ch.t.fatalfInNonMain("%s: send: channel was closed", ch.name()) ch.t.fatalfInNonMain("%s: send: channel was closed", ch.name)
case ch.msgq <- &_Msg{event, ack}: case ch.msgq <- &_Msg{event, ack}:
err := <-ack err := <-ack
if err != nil { if err != nil {
ch.t.fatalfInNonMain("%s: send: %s", ch.name(), err) ch.t.fatalfInNonMain("%s: send: %s", ch.name, err)
} }
} }
} }
// Close implements _ChanTx. // Close closes the sending side of the channel.
func (ch *_chan) Close() { func (ch *_chan) Close() {
close(ch.down) // note - not .msgq close(ch.down) // note - not .msgq
} }
// Recv implements _ChanRx. // Recv receives message from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
func (ch *_chan) Recv() *_Msg { func (ch *_chan) Recv() *_Msg {
msg := <-ch.msgq msg := <-ch.msgq
return msg return msg
} }
// _rxq implements _ChanRx.
func (ch *_chan) _rxq() <-chan *_Msg {
return ch.msgq
}
// XXX -> Unpause? Cont? Continue? // XXX -> Unpause? Cont? Continue?
// Ack acknowledges the event was processed and unblocks producer goroutine. // Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *_Msg) Ack() { func (m *_Msg) Ack() {
m.ack <- nil m.ack <- nil
} }
// XXX it should be called only by tracetest internals from under Fatal // nak tells sender that event verification failed and why.
// it is called only by tracetest internals.
func (m *_Msg) nak(why string) { func (m *_Msg) nak(why string) {
m.ack <- errors.New(why) m.ack <- errors.New(why)
} }
// _NewChan creates new _Chan channel. // newChan creates new _chan channel.
func _NewChan(name string) _Chan { func (t *T) newChan(name string) *_chan {
// XXX somehow avoid channels with duplicate names // NOTE T ensures not to create channels with duplicate names.
// (only allow to create named channels from under dispatcher?) return &_chan{t: t, name: name, msgq: make(chan *_Msg), down: make(chan struct{}), }
return &_chan{msgq: make(chan *_Msg), down: make(chan struct{}), _name: name}
} }
...@@ -225,7 +188,7 @@ type T struct { ...@@ -225,7 +188,7 @@ type T struct {
_testing_TB _testing_TB
mu sync.Mutex mu sync.Mutex
streamTab map[/*stream*/string]_Chan // set to nil on test shutdown streamTab map[/*stream*/string]*_chan // set to nil on test shutdown
routeEvent func(event interface{}) (stream string) routeEvent func(event interface{}) (stream string)
tracev []eventTrace // record of events as they happen tracev []eventTrace // record of events as they happen
delayInjectTab map[/*stream*/string]*delayInjectState delayInjectTab map[/*stream*/string]*delayInjectState
...@@ -317,15 +280,14 @@ func (t *T) RxEvent(event interface{}) { ...@@ -317,15 +280,14 @@ func (t *T) RxEvent(event interface{}) {
// chanForStream returns channel corresponding to stream. // chanForStream returns channel corresponding to stream.
// must be called under mu. // must be called under mu.
func (t *T) chanForStream(stream string) _Chan { func (t *T) chanForStream(stream string) *_chan {
if t.streamTab == nil { if t.streamTab == nil {
return nil // t is no longer operational after e.g. deadlock return nil // t is no longer operational after e.g. deadlock
} }
ch, ok := t.streamTab[stream] ch, ok := t.streamTab[stream]
if !ok { if !ok {
ch = _NewChan(stream) ch = t.newChan(stream)
ch.(*_chan).t = t // XXX move into _NewChan
t.streamTab[stream] = ch t.streamTab[stream] = ch
} }
return ch return ch
...@@ -383,7 +345,7 @@ func (t *T) xget1(stream string, eventp interface{}) *_Msg { ...@@ -383,7 +345,7 @@ func (t *T) xget1(stream string, eventp interface{}) *_Msg {
var msg *_Msg var msg *_Msg
select { select {
case msg = <-ch._rxq(): // unwrapped Recv case msg = <-ch.msgq: // unwrapped Recv
// ok // ok
case <-time.After(*deadTime): case <-time.After(*deadTime):
...@@ -393,7 +355,7 @@ func (t *T) xget1(stream string, eventp interface{}) *_Msg { ...@@ -393,7 +355,7 @@ func (t *T) xget1(stream string, eventp interface{}) *_Msg {
reventp := reflect.ValueOf(eventp) reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) { if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
t.queuenak(msg, "unexpected event type") t.queuenak(msg, "unexpected event type")
t.Fatalf("%s: expect: %s: got %T %v", ch.name(), reventp.Elem().Type(), msg.Event, msg.Event) t.Fatalf("%s: expect: %s: got %T %v", ch.name, reventp.Elem().Type(), msg.Event, msg.Event)
} }
// *eventp = msg.Event // *eventp = msg.Event
...@@ -455,9 +417,9 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -455,9 +417,9 @@ func (t *T) closeStreamTab() (nnak int) {
} }
// print details about pending events and all streams // print details about pending events and all streams
type sendInfo struct{ch _Chan; msg *_Msg} type sendInfo struct{ch *_chan; msg *_Msg}
var sendv []sendInfo // sends are pending here var sendv []sendInfo // sends are pending here
var quietv []_Chan // this channels are quiet var quietv []*_chan // this channels are quiet
// order channels by name // order channels by name
var streams []string var streams []string
...@@ -472,7 +434,7 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -472,7 +434,7 @@ func (t *T) closeStreamTab() (nnak int) {
ch := streamTab[stream] ch := streamTab[stream]
// check whether someone is sending on channels without blocking. // check whether someone is sending on channels without blocking.
select { select {
case msg := <-ch._rxq(): case msg := <-ch.msgq:
sendv = append(sendv, sendInfo{ch, msg}) sendv = append(sendv, sendInfo{ch, msg})
default: default:
quietv = append(quietv, ch) quietv = append(quietv, ch)
...@@ -481,10 +443,10 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -481,10 +443,10 @@ func (t *T) closeStreamTab() (nnak int) {
pending := fmt.Sprintf("test shutdown: #streams: %d, #(pending events): %d\n", len(streams), len(sendv)) pending := fmt.Sprintf("test shutdown: #streams: %d, #(pending events): %d\n", len(streams), len(sendv))
for _, __ := range sendv { for _, __ := range sendv {
pending += fmt.Sprintf("%s\t<- %T %v\n", __.ch.name(), __.msg.Event, __.msg.Event) pending += fmt.Sprintf("%s\t<- %T %v\n", __.ch.name, __.msg.Event, __.msg.Event)
} }
for _, ch := range quietv { for _, ch := range quietv {
pending += fmt.Sprintf("# %s\n", ch.name()) pending += fmt.Sprintf("# %s\n", ch.name)
} }
// log the details and nak senders that we received from. // log the details and nak senders that we received from.
...@@ -522,7 +484,7 @@ func Run(t testing.TB, f func(t *T)) { ...@@ -522,7 +484,7 @@ func Run(t testing.TB, f func(t *T)) {
func run(t testing.TB, f func(t *T), delayInjectTab map[string]*delayInjectState) *T { func run(t testing.TB, f func(t *T), delayInjectTab map[string]*delayInjectState) *T {
tT := &T{ tT := &T{
_testing_TB: t, _testing_TB: t,
streamTab: make(map[string]_Chan), streamTab: make(map[string]*_chan),
delayInjectTab: delayInjectTab, delayInjectTab: delayInjectTab,
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment