Commit 28666738 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a636a159
// Copyright (C) 2017-2019 Nexedi SA and Contributors.
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -26,9 +26,9 @@
// each other by causality or so called happens-before relation.
//
// However in a concurrent system one can decompose all events into serial
// streams in which events are strictly ordered by causality with respect to
// each other. This decomposition in turn allows to verify that in every stream
// events happenned as expected.
// streams in which events should be strictly ordered by causality with respect
// to each other. This decomposition in turn allows to verify that in every
// stream events happenned as expected.
//
// Verification of events for all streams can be done by one *sequential*
// process:
......@@ -62,7 +62,7 @@
// router says.
//
// - for every serial stream of events create synchronous delivery channel
// (SyncChan) and event Checker. XXX
// (Chan) and event Checker. XXX
//
//
// XXX more text describing how to use the package.
......@@ -73,7 +73,7 @@
// another comes at different trace channel.
//
// XXX (if tested system is serial only there is no need to use Dispatcher and
// routing - the collector can send output directly to the only SyncChan with
// routing - the collector can send output directly to the only Chan with
// only one EventChecker connected to it).
package tracetest
......@@ -91,29 +91,36 @@ import (
var chatty = flag.Bool("tracetest.v", false, "verbose: print events as they are sent on trace channels")
// SyncChan provides synchronous channel with additional property that send
// Chan provides synchronous channel with additional property that send
// blocks until receiving side explicitly acknowledges message was received and
// processed.
//
// New channels must be created via NewSyncChan.
// New channels must be created via NewChan.
//
// It is safe to use SyncChan from multiple goroutines simultaneously.
//
// XXX -> Chan
type SyncChan struct {
msgq chan *SyncMsg
// It is safe to use Chan from multiple goroutines simultaneously.
type Chan struct {
msgq chan *Msg
name string
}
// Msg represents message with 1 event sent over Chan.
//
// The goroutine which sent the message will wait for Ack before continue.
type Msg struct {
Event interface {}
ack chan<- bool
}
// Send sends event to a consumer and waits for ack.
//
// if main testing goroutine detects any problem Send panics. XXX
func (ch *SyncChan) Send(event interface{}) {
func (ch *Chan) Send(event interface{}) {
if *chatty {
fmt.Printf("%s <- %T %v\n", ch.name, event, event)
}
ack := make(chan bool)
ch.msgq <- &SyncMsg{event, ack}
ch.msgq <- &Msg{event, ack}
ok := <-ack
if !ok {
panic(fmt.Sprintf("%s: send: deadlock", ch.name))
......@@ -123,31 +130,21 @@ func (ch *SyncChan) Send(event interface{}) {
// Recv receives message from a producer.
//
// The consumer, after dealing with the message, must send back an ack.
func (ch *SyncChan) Recv() *SyncMsg {
func (ch *Chan) Recv() *Msg {
msg := <-ch.msgq
return msg
}
// SyncMsg represents message with 1 event sent over SyncChan.
//
// The goroutine which sent the message will wait for Ack before continue.
//
// XXX -> Event?
type SyncMsg struct {
Event interface {}
ack chan<- bool
}
// Ack acknowledges the event was processed and unblocks producer goroutine.
func (m *SyncMsg) Ack() {
func (m *Msg) Ack() {
m.ack <- true
}
// NewSyncChan creates new SyncChan channel.
func NewSyncChan(name string) *SyncChan {
// NewChan creates new Chan channel.
func NewChan(name string) *Chan {
// XXX somehow avoid channels with duplicate names
// (only allow to create named channels from under dispatcher?)
return &SyncChan{msgq: make(chan *SyncMsg), name: name}
return &Chan{msgq: make(chan *Msg), name: name}
}
......@@ -155,10 +152,10 @@ func NewSyncChan(name string) *SyncChan {
// EventChecker is testing utility to verify that sequence of events coming
// from a single SyncChan is as expected.
// from a single Chan is as expected.
type EventChecker struct {
t testing.TB
in *SyncChan
in *Chan
dispatch *EventDispatcher
}
......@@ -166,7 +163,7 @@ type EventChecker struct {
// `in` and use `t` for tests reporting.
//
// XXX -> dispatch.NewChecker() ?
func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in *SyncChan) *EventChecker {
func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in *Chan) *EventChecker {
return &EventChecker{t: t, in: in, dispatch: dispatch}
}
......@@ -174,9 +171,9 @@ func NewEventChecker(t testing.TB, dispatch *EventDispatcher, in *SyncChan) *Eve
//
// if checks do not pass - fatal testing error is raised
// XXX why eventp, not just event here?
func (evc *EventChecker) xget1(eventp interface{}) *SyncMsg {
func (evc *EventChecker) xget1(eventp interface{}) *Msg {
evc.t.Helper()
var msg *SyncMsg
var msg *Msg
select {
case msg = <-evc.in.msgq: // unwrapped Recv
......@@ -200,7 +197,7 @@ func (evc *EventChecker) xget1(eventp interface{}) *SyncMsg {
// expect1 asks checker to expect next event to be eventExpect (both type and value)
//
// if checks do not pass - fatal testing error is raised.
func (evc *EventChecker) expect1(eventExpect interface{}) *SyncMsg {
func (evc *EventChecker) expect1(eventExpect interface{}) *Msg {
evc.t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
......@@ -237,7 +234,7 @@ func (evc *EventChecker) Expect(expected interface{}) {
// send ACK back by itself.
//
// If check does not pass - fatal testing error is raised.
func (evc *EventChecker) ExpectNoACK(expected interface{}) *SyncMsg {
func (evc *EventChecker) ExpectNoACK(expected interface{}) *Msg {
evc.t.Helper()
msg := evc.expect1(expected)
......@@ -258,7 +255,7 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
dstv := rt.AllRoutes()
bad := fmt.Sprintf("%s: deadlock waiting for %T\n", evc.in.name, eventp)
type sendInfo struct{dst *SyncChan; event interface{}}
type sendInfo struct{dst *Chan; event interface{}}
var sendv []sendInfo
for _, dst := range dstv {
// check whether someone is sending on a dst without blocking.
......@@ -298,18 +295,18 @@ func (evc *EventChecker) deadlock(eventp interface{}) {
// ----------------------------------------
// EventRouter is the interface used for routing events to appropriate output SyncChan.
// EventRouter is the interface used for routing events to appropriate output Chan.
//
// It should be safe to use EventRouter from multiple goroutines simultaneously.
type EventRouter interface {
// Route should return appropriate destination for event.
Route(event interface{}) *SyncChan
Route(event interface{}) *Chan
// AllRoutes should return all routing destinations.
AllRoutes() []*SyncChan
AllRoutes() []*Chan
}
// EventDispatcher dispatches events to appropriate SyncChan for checking
// EventDispatcher dispatches events to appropriate Chan for checking
// according to provided router.
type EventDispatcher struct {
rt EventRouter
......
......@@ -50,7 +50,7 @@ import (
func TestMasterStorage(t0 *testing.T) {
X := exc.Raiseif
t := NewTestCluster(t0, "abc1")
t := tNewCluster(t0, "abc1")
defer t.Stop()
M := t.NewMaster("m")
......@@ -428,7 +428,7 @@ func TestMasterStorage(t0 *testing.T) {
// XXX hack - better we don't need it.
// XXX -> with testenv.MkCluster() we won't need it
type tdispatch1 struct {
outch *tracetest.SyncChan
outch *tracetest.Chan
}
func (d tdispatch1) Dispatch(event interface{}) {
......@@ -448,7 +448,7 @@ func benchmarkGetObject(b *testing.B, Mnet, Snet, Cnet xnet.Networker, benchit f
M := tNewMaster("abc1", "", Mnet)
// XXX to wait for "M listens at ..." & "ready to start" -> XXX add something to M api?
cG := tracetest.NewSyncChan("main")
cG := tracetest.NewChan("main")
tG := tracetest.NewEventChecker(b, nil /* XXX */, cG)
tracer := NewTraceCollector(tdispatch1{cG})
......
......@@ -36,13 +36,16 @@ import (
)
// TestCluster ... XXX
type TestCluster struct {
name string
network *pipenet.Network // XXX -> lo
// tCluster is a test NEO cluster NEO.
// Create it with tNewCluster.
type tCluster struct {
testing.TB // original testing env this cluster was created at
gotracer *TraceCollector // XXX -> GoTracer
//tpy *PyTracer
name string // name of the cluster
network *pipenet.Network // nodes interoperate via netowrk XXX -> lo
gotracer *TraceCollector // for tracing go nodes XXX -> GoTracer
//tpy *PyTracer // for tracing py nodes
erouter *EventRouter
edispatch *tracetest.EventDispatcher
......@@ -50,8 +53,6 @@ type TestCluster struct {
tabMu sync.Mutex
nodeTab map[string/*node*/]*tNode
checkTab map[string/*node*/]*tracetest.EventChecker
testing.TB // original testing env this cluster was created at
}
// tNode represents information about a test node ... XXX
......@@ -75,13 +76,13 @@ type ITestClient interface {
zodb.IStorageDriver
}
// NewTestCluster creates new NEO test cluster.
// tNewCluster creates new NEO test cluster.
//
// XXX ...
//
// XXX defer t.Stop()
func NewTestCluster(ttest testing.TB, name string) *TestCluster {
t := &TestCluster{
func tNewCluster(ttest testing.TB, name string) *tCluster {
t := &tCluster{
name: name,
network: pipenet.New("testnet"), // test network
......@@ -104,7 +105,7 @@ func NewTestCluster(ttest testing.TB, name string) *TestCluster {
//
// All processes of the cluster are stopped ... XXX
// XXX do we need error return?
func (t *TestCluster) Stop() error {
func (t *tCluster) Stop() error {
//... XXX
t.gotracer.Detach()
//XXX t.pytracer.Detach()
......@@ -117,7 +118,7 @@ func (t *TestCluster) Stop() error {
//
// name might be of "node" or "node1-node2" kind. XXX more text
// node or node1/node2 must be already registered.
func (t *TestCluster) Checker(name string) *tracetest.EventChecker {
func (t *tCluster) Checker(name string) *tracetest.EventChecker {
t.tabMu.Lock()
defer t.tabMu.Unlock()
......@@ -134,7 +135,7 @@ func (t *TestCluster) Checker(name string) *tracetest.EventChecker {
// the node is registered in .nodeTab and .checkTab is populated ... XXX
//
// the node must not be registered before.
func (t *TestCluster) registerNewNode(name string) *tNode {
func (t *tCluster) registerNewNode(name string) *tNode {
t.tabMu.Lock()
defer t.tabMu.Unlock()
......@@ -144,16 +145,16 @@ func (t *TestCluster) registerNewNode(name string) *tNode {
}
// tracechecker for events on node
c1 := tracetest.NewSyncChan(name) // trace of events local to node
c1 := tracetest.NewChan(name) // trace of events local to node
t.erouter.BranchNode(name, c1)
t.checkTab[name] = tracetest.NewEventChecker(t.TB, t.edispatch, c1)
// tracecheckers for events on links of all node1-node2 pairs
for name2 := range t.nodeTab {
// trace of events with cause root being node1 -> node2 send
c12 := tracetest.NewSyncChan(name + "-" + name2)
c12 := tracetest.NewChan(name + "-" + name2)
// ----//---- node2 -> node1 send
c21 := tracetest.NewSyncChan(name2 + "-" + name)
c21 := tracetest.NewChan(name2 + "-" + name)
t12 := tracetest.NewEventChecker(t.TB, t.edispatch, c12)
t21 := tracetest.NewEventChecker(t.TB, t.edispatch, c21)
......@@ -177,7 +178,7 @@ func (t *TestCluster) registerNewNode(name string) *tNode {
// The node must be not yet existing and will be dedicated to the created master fully. XXX
//
// XXX error of creating py process?
func (t *TestCluster) NewMaster(name string) ITestMaster {
func (t *tCluster) NewMaster(name string) ITestMaster {
node := t.registerNewNode(name)
m := tNewMaster(t.name, ":1", node.net)
......@@ -187,14 +188,14 @@ func (t *TestCluster) NewMaster(name string) ITestMaster {
return m
}
func (t *TestCluster) NewStorage(name, masterAddr string, back storage.Backend) ITestStorage {
func (t *tCluster) NewStorage(name, masterAddr string, back storage.Backend) ITestStorage {
node := t.registerNewNode(name)
s := tNewStorage(t.name, masterAddr, ":1", node.net, back)
t.gotracer.RegisterNode(s.node, name)
return s
}
func (t *TestCluster) NewClient(name, masterAddr string) ITestClient {
func (t *tCluster) NewClient(name, masterAddr string) ITestClient {
node := t.registerNewNode(name)
c := NewClient(t.name, masterAddr, node.net)
t.gotracer.RegisterNode(c.node, name)
......
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......@@ -157,11 +157,11 @@ func δnode(where string, laddr string, typ proto.NodeType, num int32, state pro
type EventRouter struct {
mu sync.Mutex
defaultq *tracetest.SyncChan
defaultq *tracetest.Chan
// events specific to particular node - e.g. node starts listening,
// state on that node changes, etc...
byNode map[string /*host*/]*tracetest.SyncChan
byNode map[string /*host*/]*tracetest.Chan
// state on host changes. Takes precendece over byNode.
//
......@@ -171,7 +171,7 @@ type EventRouter struct {
// byNode("C") and simply verify events on tMC and then tC in that order.
// keeping events local to C on tC, not tMC helps TestCluster to
// organize trace channels in uniform way )
byState map[string /*host*/]*tracetest.SyncChan
byState map[string /*host*/]*tracetest.Chan
// event on a-b link
byLink map[string /*host-host*/]*linkDst
......@@ -183,16 +183,16 @@ type EventRouter struct {
func NewEventRouter() *EventRouter {
return &EventRouter{
defaultq: tracetest.NewSyncChan("default"),
byNode: make(map[string]*tracetest.SyncChan),
byState: make(map[string]*tracetest.SyncChan),
defaultq: tracetest.NewChan("default"),
byNode: make(map[string]*tracetest.Chan),
byState: make(map[string]*tracetest.Chan),
byLink: make(map[string]*linkDst),
connected: make(map[string]bool),
}
}
func (r *EventRouter) AllRoutes() []*tracetest.SyncChan {
rtset := map[*tracetest.SyncChan]int{}
func (r *EventRouter) AllRoutes() []*tracetest.Chan {
rtset := map[*tracetest.Chan]int{}
rtset[r.defaultq] = 1
for _, dst := range r.byNode {
rtset[dst] = 1
......@@ -205,7 +205,7 @@ func (r *EventRouter) AllRoutes() []*tracetest.SyncChan {
rtset[ldst.b] = 1
}
var rtv []*tracetest.SyncChan
var rtv []*tracetest.Chan
for dst := range rtset {
rtv = append(rtv, dst)
}
......@@ -234,7 +234,7 @@ func host(addr string) string {
}
// Route routes events according to rules specified via Branch*().
func (r *EventRouter) Route(event interface{}) (dst *tracetest.SyncChan) {
func (r *EventRouter) Route(event interface{}) (dst *tracetest.Chan) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -318,7 +318,7 @@ func (r *EventRouter) Route(event interface{}) (dst *tracetest.SyncChan) {
}
// routeState routes event corresponding to state change on host
func (r *EventRouter) routeState(host string) (dst *tracetest.SyncChan) {
func (r *EventRouter) routeState(host string) (dst *tracetest.Chan) {
// lookup dst by state rules
dst = r.byState[host]
if dst != nil {
......@@ -330,7 +330,7 @@ func (r *EventRouter) routeState(host string) (dst *tracetest.SyncChan) {
}
// BranchNode branches events corresponding to host.
func (r *EventRouter) BranchNode(host string, dst *tracetest.SyncChan) {
func (r *EventRouter) BranchNode(host string, dst *tracetest.Chan) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -344,7 +344,7 @@ func (r *EventRouter) BranchNode(host string, dst *tracetest.SyncChan) {
// BranchState branches events corresponding to state changes on host.
//
// XXX not needed?
func (r *EventRouter) BranchState(host string, dst *tracetest.SyncChan) {
func (r *EventRouter) BranchState(host string, dst *tracetest.Chan) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -364,7 +364,7 @@ func (r *EventRouter) BranchState(host string, dst *tracetest.SyncChan) {
// networking cause root coming from b - go to dstb.
//
// XXX extend to support multiple ongoing streams (e.g. prefetch) ?
func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.SyncChan) {
func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.Chan) {
r.mu.Lock()
defer r.mu.Unlock()
......@@ -382,7 +382,7 @@ func (r *EventRouter) BranchLink(link string, dsta, dstb *tracetest.SyncChan) {
// Events go to either a or b depending on which side initiated particular
// connection on top of the link.
type linkDst struct {
a *tracetest.SyncChan // net cause was on dialer
b *tracetest.SyncChan // net cause was on listener
a *tracetest.Chan // net cause was on dialer
b *tracetest.Chan // net cause was on listener
}
// Copyright (C) 2017-2018 Nexedi SA and Contributors.
// Copyright (C) 2017-2020 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment