Commit 1257f0f3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d31f1348
......@@ -19,10 +19,10 @@ package server
// test interaction between nodes
import (
//"bytes"
"bytes"
"context"
//"io"
//"reflect"
"reflect"
"testing"
//"../../neo/client"
......@@ -30,10 +30,13 @@ import (
//"../../zodb"
"../../zodb/storage/fs1"
"../../xcommon/xnet"
"../../xcommon/xnet/pipenet"
"../../xcommon/xsync"
"lab.nexedi.com/kirr/go123/exc"
"fmt"
)
// XXX dup from connection_test
......@@ -48,9 +51,103 @@ func xfs1stor(path string) *fs1.FileStorage {
return zstor
}
// traceMsg represents one tracing communication
// the goroutine which produced it will wait for send on ack before continue
type traceMsg struct {
event interface {} // xnet.Trace* | ...
ack chan struct{}
}
// TraceChecker synchronously collects and checks tracing events
// it collects events from several sources and sends them all into one channel
// for each event the goroutine which produced it will wait for ack before continue
// XXX more text XXX naming -> verifier?
type TraceChecker struct {
t *testing.T
msgch chan *traceMsg // XXX or chan traceMsg (no ptr) ?
}
func NewTraceChecker(t *testing.T) *TraceChecker {
return &TraceChecker{t: t, msgch: make(chan *traceMsg)}
}
// get1 gets 1 event in place and checks it has expected type
func (tc *TraceChecker) xget1(eventp interface{}) *traceMsg {
println("xget1: entry")
msg := <-tc.msgch
println("xget1: msg", msg)
revp := reflect.ValueOf(eventp)
if revp.Type().Elem() != reflect.TypeOf(msg.event) {
tc.t.Fatalf("expected %s; got %#v", revp.Elem().Type(), msg.event)
}
// TODO *event = msg.event
return msg
}
// trace1 sends message with one tracing event to consumer
func (tc *TraceChecker) trace1(event interface{}) {
ack := make(chan struct{})
fmt.Printf("I: %v ...", event)
println("zzz")
//panic(0)
tc.msgch <- &traceMsg{event, ack}
<-ack
fmt.Printf(" ok\n")
}
func (tc *TraceChecker) TraceNetDial(ev *xnet.TraceDial) { tc.trace1(ev) }
func (tc *TraceChecker) TraceNetListen(ev *xnet.TraceListen) { tc.trace1(ev) }
func (tc *TraceChecker) TraceNetTx(ev *xnet.TraceTx) { tc.trace1(ev) }
// Expect instruct checker to expect next event to be ...
// XXX
func (tc *TraceChecker) ExpectNetDial(dst string) {
var ev *xnet.TraceDial
msg := tc.xget1(&ev)
if ev.Dst != dst {
tc.t.Fatalf("net dial: have %v; want: %v", ev.Dst, dst)
}
close(msg.ack)
}
func (tc *TraceChecker) ExpectNetListen(laddr string) {
var ev *xnet.TraceListen
msg := tc.xget1(&ev)
if ev.Laddr != laddr {
tc.t.Fatalf("net listen: have %v; want %v", ev.Laddr, laddr)
}
println("listen: ok")
close(msg.ack)
}
func (tc *TraceChecker) ExpectNetTx(src, dst string, pkt string) {
var ev *xnet.TraceTx
msg := tc.xget1(&ev)
pktb := []byte(pkt)
if !(ev.Src.String() == src &&
ev.Dst.String() == dst &&
bytes.Equal(ev.Pkt, pktb)) {
// TODO also print all (?) previous events
tc.t.Fatalf("expect:\nhave: %s -> %s %v\nwant: %s -> %s %v",
ev.Src, ev.Dst, ev.Pkt, src, dst, pktb)
}
close(msg.ack)
}
// M drives cluster with 1 S through recovery -> verification -> service -> shutdown
func TestMasterStorage(t *testing.T) {
net := pipenet.New("") // test network
tc := NewTraceChecker(t)
net := xnet.NetTrace(pipenet.New(""), tc) // test network
Maddr := "0"
Saddr := "1"
......@@ -64,7 +161,11 @@ func TestMasterStorage(t *testing.T) {
_ = err // XXX
})
println("222")
// expect:
//tc.ExpectNetListen("0")
tc.ExpectNetDial("0")
println("333")
// M.clusterState <- RECOVERY
// M.nodeTab <- Node(M)
......@@ -79,6 +180,9 @@ func TestMasterStorage(t *testing.T) {
// expect:
// M <- S .? RequestIdentification{...} + TODO test ID rejects
tc.ExpectNetTx("2c", "2s", "\x00\x00\x00\x01") // handshake
tc.ExpectNetTx("2s", "2c", "\x00\x00\x00\x01")
// M -> S .? AcceptIdentification{...}
// M.nodeTab <- Node(S) XXX order can be racy?
// S.nodeTab <- Node(M) XXX order can be racy?
......
......@@ -34,8 +34,25 @@ import (
// only Tx events are traced:
// - because Write, contrary to Read, never writes partial data on non-error
// - because in case of pipenet tracing writes only is enough to get whole network exchange picture
func NetTrace(inner Network, trace func (t *TraceTx)) Network {
return &netTrace{inner, trace}
func NetTrace(inner Network, tracer Tracer) Network {
return &netTrace{inner, tracer}
}
// Tracer is the interface that needs to be implemented by network trace receivers
type Tracer interface {
TraceNetDial(*TraceDial)
TraceNetListen(*TraceListen)
TraceNetTx(*TraceTx)
}
// TraceDial is event corresponding to network dialing
type TraceDial struct {
Dst string
}
// TraceListen is event corresponding to network listening
type TraceListen struct {
Laddr string
}
// TraceTx is event corresponding to network transmission
......@@ -48,7 +65,7 @@ type TraceTx struct {
// it is wrapped with traceConn
type netTrace struct {
inner Network
trace func(t *TraceTx)
tracer Tracer
}
func (nt *netTrace) Network() string {
......@@ -56,19 +73,23 @@ func (nt *netTrace) Network() string {
}
func (nt *netTrace) Dial(ctx context.Context, addr string) (net.Conn, error) {
nt.tracer.TraceNetDial(&TraceDial{Dst: addr})
c, err := nt.inner.Dial(ctx, addr)
if err != nil {
return nil, err
}
return &traceConn{nt, c}, nil
// XXX +TraceNetDialPost ?
}
func (nt *netTrace) Listen(laddr string) (net.Listener, error) {
nt.tracer.TraceNetListen(&TraceListen{Laddr: laddr})
l, err := nt.inner.Listen(laddr)
if err != nil {
return nil, err
}
return &netTraceListener{nt, l}, nil
// XXX +TraceNetListenPost ?
}
// netTraceListener wraps net.Listener to wrap accepted connections with traceConn
......@@ -92,7 +113,7 @@ type traceConn struct {
}
func (tc *traceConn) Write(b []byte) (int, error) {
t := &TraceTx{Src: tc.LocalAddr(), Dst: tc.RemoteAddr(), Pkt: b}
tc.nt.trace(t)
tc.nt.tracer.TraceNetTx(&TraceTx{Src: tc.LocalAddr(), Dst: tc.RemoteAddr(), Pkt: b})
return tc.Conn.Write(b)
// XXX +TraceNetTxPost ?
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment