Commit 3055a942 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 15d9be2f
...@@ -99,46 +99,44 @@ var ( ...@@ -99,46 +99,44 @@ var (
deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock") deadTime = flag.Duration("tracetest.deadtime", 3*time.Second, "time after which no events activity is considered to be a deadlock")
) )
// XXX hide Chan from public API // _Chan provides synchronous channel with additional property that send
// Chan provides synchronous channel with additional property that send
// blocks until receiving side explicitly acknowledges message was received and // blocks until receiving side explicitly acknowledges message was received and
// processed. // processed.
// //
// New channels must be created via NewChan. // New channels must be created via _NewChan.
// //
// It is safe to use Chan from multiple goroutines simultaneously. // It is safe to use _Chan from multiple goroutines simultaneously.
type Chan interface { type _Chan interface {
ChanTx _ChanTx
ChanRx _ChanRx
name() string // name of the channel name() string // name of the channel
} }
// ChanTx represents "send-only" half of Chan. // _ChanTx represents "send-only" half of _Chan.
// It is similar to chan<- . // It is similar to chan<- .
type ChanTx interface { type _ChanTx interface {
// Send sends event to a consumer and waits for ack. // Send sends event to a consumer and waits for ack.
// if main testing goroutine detects any problem Send panics. XXX // if main testing goroutine detects any problem Send panics.
Send(event interface{}) Send(event interface{})
// Close closes the sending side of the channel. // Close closes the sending side of the channel.
Close() Close()
} }
// ChanRx represents "receive-only" half of Chan. // _ChanRx represents "receive-only" half of _Chan.
// It is similar to <-chan . // It is similar to <-chan .
type ChanRx interface { type _ChanRx interface {
// Recv receives message from a producer. // Recv receives message from a producer.
// //
// The consumer, after dealing with the message, must send back an ack. // The consumer, after dealing with the message, must send back an ack.
Recv() *Msg Recv() *Msg
// _rxq returns raw channel that is serving ChanRx. // _rxq returns raw channel that is serving _ChanRx.
// it is used internally to use ChanRx in select. // it is used internally to use _ChanRx in select.
_rxq() <-chan *Msg _rxq() <-chan *Msg
} }
// Msg represents message with 1 event sent over Chan. // Msg represents message with 1 event sent over _Chan.
// //
// The goroutine which sent the message will wait for Ack before continue. // The goroutine which sent the message will wait for Ack before continue.
type Msg struct { type Msg struct {
...@@ -148,7 +146,7 @@ type Msg struct { ...@@ -148,7 +146,7 @@ type Msg struct {
// _chan implements Chan. // _chan implements _Chan.
type _chan struct { type _chan struct {
t *T t *T
msgq chan *Msg msgq chan *Msg
...@@ -156,12 +154,12 @@ type _chan struct { ...@@ -156,12 +154,12 @@ type _chan struct {
_name string _name string
} }
// name implements Chan. // name implements _Chan.
func (ch *_chan) name() string { func (ch *_chan) name() string {
return ch._name return ch._name
} }
// Send implements ChanTx. // Send implements _ChanTx.
func (ch *_chan) Send(event interface{}) { func (ch *_chan) Send(event interface{}) {
if *chatty { if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name(), event, event) fmt.Printf("%s <- %T %v\n", ch.name(), event, event)
...@@ -179,18 +177,18 @@ func (ch *_chan) Send(event interface{}) { ...@@ -179,18 +177,18 @@ func (ch *_chan) Send(event interface{}) {
} }
} }
// Close implements ChanTx. // Close implements _ChanTx.
func (ch *_chan) Close() { func (ch *_chan) Close() {
close(ch.down) // note - not .msgq close(ch.down) // note - not .msgq
} }
// Recv implements ChanRx. // Recv implements _ChanRx.
func (ch *_chan) Recv() *Msg { func (ch *_chan) Recv() *Msg {
msg := <-ch.msgq msg := <-ch.msgq
return msg return msg
} }
// _rxq implements ChanRx. // _rxq implements _ChanRx.
func (ch *_chan) _rxq() <-chan *Msg { func (ch *_chan) _rxq() <-chan *Msg {
return ch.msgq return ch.msgq
} }
...@@ -206,8 +204,8 @@ func (m *Msg) nak(why string) { ...@@ -206,8 +204,8 @@ func (m *Msg) nak(why string) {
m.ack <- errors.New(why) m.ack <- errors.New(why)
} }
// NewChan creates new Chan channel. // _NewChan creates new _Chan channel.
func NewChan(name string) Chan { func _NewChan(name string) _Chan {
// XXX somehow avoid channels with duplicate names // XXX somehow avoid channels with duplicate names
// (only allow to create named channels from under dispatcher?) // (only allow to create named channels from under dispatcher?)
return &_chan{msgq: make(chan *Msg), down: make(chan struct{}), _name: name} return &_chan{msgq: make(chan *Msg), down: make(chan struct{}), _name: name}
...@@ -496,7 +494,7 @@ type T struct { ...@@ -496,7 +494,7 @@ type T struct {
_testing_TB _testing_TB
mu sync.Mutex mu sync.Mutex
streamTab map[/*stream*/string]Chan // set to nil on test shutdown streamTab map[/*stream*/string]_Chan // set to nil on test shutdown
routeEvent func(event interface{}) (stream string) routeEvent func(event interface{}) (stream string)
tracev []eventTrace // record of events as they happen tracev []eventTrace // record of events as they happen
delayInjectTab map[/*stream*/string]*delayInjectState delayInjectTab map[/*stream*/string]*delayInjectState
...@@ -608,15 +606,15 @@ func (t *T) RxEvent(event interface{}) { ...@@ -608,15 +606,15 @@ func (t *T) RxEvent(event interface{}) {
// chanForStream returns channel corresponding to stream. // chanForStream returns channel corresponding to stream.
// must be called under mu. // must be called under mu.
func (t *T) chanForStream(stream string) Chan { func (t *T) chanForStream(stream string) _Chan {
if t.streamTab == nil { if t.streamTab == nil {
return nil // t is no longer operational after e.g. deadlock return nil // t is no longer operational after e.g. deadlock
} }
ch, ok := t.streamTab[stream] ch, ok := t.streamTab[stream]
if !ok { if !ok {
ch = NewChan(stream) ch = _NewChan(stream)
ch.(*_chan).t = t // XXX move into NewChan ch.(*_chan).t = t // XXX move into _NewChan
t.streamTab[stream] = ch t.streamTab[stream] = ch
} }
return ch return ch
...@@ -746,9 +744,9 @@ func (t *T) closeStreamTab() (nnak int) { ...@@ -746,9 +744,9 @@ func (t *T) closeStreamTab() (nnak int) {
} }
// print details about pending events and all streams // print details about pending events and all streams
type sendInfo struct{ch Chan; msg *Msg} type sendInfo struct{ch _Chan; msg *Msg}
var sendv []sendInfo // sends are pending here var sendv []sendInfo // sends are pending here
var quietv []Chan // this channels are quiet var quietv []_Chan // this channels are quiet
// order channels by name // order channels by name
var streams []string var streams []string
...@@ -817,7 +815,7 @@ func Verify(t *testing.T, f func(t *T)) { ...@@ -817,7 +815,7 @@ func Verify(t *testing.T, f func(t *T)) {
testf := func(t testing.TB, delayInjectTab map[string]*delayInjectState) *T { testf := func(t testing.TB, delayInjectTab map[string]*delayInjectState) *T {
tT := &T{ tT := &T{
_testing_TB: t, _testing_TB: t,
streamTab: make(map[string]Chan), streamTab: make(map[string]_Chan),
delayInjectTab: delayInjectTab, delayInjectTab: delayInjectTab,
} }
......
...@@ -428,7 +428,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -428,7 +428,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
obj1, err := zback.Load(ctx, xid1) obj1, err := zback.Load(ctx, xid1)
if err != nil { if err != nil {
b.Fatal(err) t.Fatal(err)
} }
buf1, serial1 := obj1.Data, obj1.Serial buf1, serial1 := obj1.Data, obj1.Serial
...@@ -436,11 +436,11 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f ...@@ -436,11 +436,11 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
xcload1 := func() { xcload1 := func() {
cbuf1, cserial1, err := C.Load(ctx, xid1) cbuf1, cserial1, err := C.Load(ctx, xid1)
if err != nil { if err != nil {
b.Fatal(err) t.Fatal(err)
} }
if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) { if !(bytes.Equal(cbuf1.Data, buf1.Data) && cserial1 == serial1) {
b.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1) t.Fatalf("C.Load first -> %q %v ; want %q %v", cbuf1.Data, cserial1, buf1.Data, serial1)
} }
cbuf1.Release() cbuf1.Release()
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment