Commit f5c1174f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 2510d03b
......@@ -19,10 +19,10 @@ package server
// test interaction between nodes
import (
"bytes"
//"bytes"
"context"
//"io"
"reflect"
//"reflect"
"testing"
//"../../neo/client"
......@@ -33,6 +33,7 @@ import (
"../../xcommon/xnet"
"../../xcommon/xnet/pipenet"
"../../xcommon/xsync"
"../../xcommon/xtesting"
"lab.nexedi.com/kirr/go123/exc"
......@@ -54,59 +55,18 @@ func xfs1stor(path string) *fs1.FileStorage {
}
// traceMsg represents one tracing communication
// the goroutine which produced it will wait for send on ack before continue
type traceMsg struct {
event interface {} // xnet.Trace* | ...
ack chan struct{}
// XXX tracer which can collect tracing events from net + TODO master/storage/etc...
// XXX naming
type MyTracer struct {
*xtesting.SyncTracer
}
// TraceChecker synchronously collects and checks tracing events
// it collects events from several sources and sends them all into one channel
// for each event the goroutine which produced it will wait for ack before continue
type TraceChecker struct {
t *testing.T
msgch chan *traceMsg // XXX naming -> tracech ?
}
func NewTraceChecker(t *testing.T) *TraceChecker {
return &TraceChecker{t: t, msgch: make(chan *traceMsg)}
}
// get1 gets 1 event in place and checks it has expected type
func (tc *TraceChecker) xget1(eventp interface{}) *traceMsg {
tc.t.Helper()
func (t *MyTracer) TraceNetDial(ev *xnet.TraceDial) { t.Trace1(ev) }
func (t *MyTracer) TraceNetListen(ev *xnet.TraceListen) { t.Trace1(ev) }
func (t *MyTracer) TraceNetTx(ev *xnet.TraceTx) { t.Trace1(ev) }
//println("xget1: entry")
msg := <-tc.msgch
//println("xget1: msg", msg)
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.event) {
tc.t.Fatalf("expected %s; got %#v", reventp.Elem().Type(), msg.event)
}
// *eventp = msg.event
reventp.Elem().Set(reflect.ValueOf(msg.event))
return msg
}
// trace1 sends message with one tracing event to consumer
func (tc *TraceChecker) trace1(event interface{}) {
ack := make(chan struct{})
//fmt.Printf("I: %#v ...", event)
tc.msgch <- &traceMsg{event, ack}
<-ack
//fmt.Printf(" ok\n")
}
func (tc *TraceChecker) TraceNetDial(ev *xnet.TraceDial) { tc.trace1(ev) }
func (tc *TraceChecker) TraceNetListen(ev *xnet.TraceListen) { tc.trace1(ev) }
func (tc *TraceChecker) TraceNetTx(ev *xnet.TraceTx) { tc.trace1(ev) }
// Expect instruct checker to expect next event to be ...
// XXX
/*
func (tc *TraceChecker) ExpectNetDial(dst string) {
tc.t.Helper()
......@@ -150,12 +110,14 @@ func (tc *TraceChecker) ExpectNetTx(src, dst string, pkt string) {
close(msg.ack)
}
*/
// M drives cluster with 1 S through recovery -> verification -> service -> shutdown
func TestMasterStorage(t *testing.T) {
tc := NewTraceChecker(t)
net := xnet.NetTrace(pipenet.New(""), tc) // test network
tracer := &MyTracer{xtesting.NewSyncTracer()}
tc := xtesting.NewTraceChecker(t, tracer.SyncTracer)
net := xnet.NetTrace(pipenet.New(""), tracer) // test network
Maddr := "0"
Saddr := "1"
......@@ -171,7 +133,9 @@ func TestMasterStorage(t *testing.T) {
})
// expect:
tc.ExpectNetListen("0")
//tc.ExpectNetListen("0")
tc.Expect(&xnet.TraceListen{Laddr: "0"})
// M.clusterState <- RECOVERY
// M.nodeTab <- Node(M)
......@@ -185,10 +149,21 @@ func TestMasterStorage(t *testing.T) {
})
// expect:
tc.ExpectNetListen("1")
tc.ExpectNetDial("0")
tc.ExpectNetTx("2c", "2s", "\x00\x00\x00\x01") // handshake
tc.ExpectNetTx("2s", "2c", "\x00\x00\x00\x01")
//tc.ExpectNetListen("1")
tc.Expect(&xnet.TraceListen{Laddr: "1"})
tc.Expect(&xnet.TraceDial{Dst: "0"})
//tc.ExpectNetDial("0")
//tc.Expect(xnet.NetTx{Src: "2c", Dst: "2s", Pkt: []byte("\x00\x00\x00\x01")})
//tc.Expect(nettx("2c", "2s", "\x00\x00\x00\x01"))
// handshake
//tc.ExpectNetTx("2c", "2s", "\x00\x00\x00\x01") // handshake
//tc.ExpectNetTx("2s", "2c", "\x00\x00\x00\x01")
tc.ExpectPar(
&xnet.TraceTx{Src: "2c", Dst: "2s", Pkt: []byte("\x00\x00\x00\x01")},
&xnet.TraceTx{Src: "2s", Dst: "2c", Pkt: []byte("\x00\x00\x00\x01")},)
// XXX temp
return
......
......@@ -43,7 +43,7 @@ package pipenet
import (
"context"
"errors"
"fmt"
//"fmt"
"net"
"strconv"
"sync"
......@@ -60,8 +60,10 @@ var (
// Addr represents address of a pipenet endpoint
type Addr struct {
network string // full network name, e.g. "pipe"
addr string // port + c/s depending on connection endpoint
Net string // full network name, e.g. "pipe"
Port int // -1, if anonymous
Endpoint int // 0 (client) | 1 (server) | -1 (listening)
//Addr string // port + c/s depending on connection endpoint
}
// Network implements synchronous in-memory network of pipes
......@@ -121,8 +123,9 @@ func New(name string) *Network {
// Once listener is started, Dials could connect to listening address.
// Connection requests created by Dials could be accepted via Accept.
func (n *Network) Listen(laddr string) (net.Listener, error) {
var netladdr net.Addr
lerr := func(err error) error {
return &net.OpError{Op: "listen", Net: n.Network(), Addr: &Addr{n.Network(), laddr}, Err: err}
return &net.OpError{Op: "listen", Net: n.Network(), Addr: netladdr, Err: err}
}
// laddr must be empty or int >= 0
......@@ -130,10 +133,12 @@ func (n *Network) Listen(laddr string) (net.Listener, error) {
if laddr != "" {
port, err := strconv.Atoi(laddr)
if err != nil || port < 0 {
return nil, lerr(errBadAddress) // XXX -> net.AddrError ?
return nil, lerr(&net.AddrError{Err: "invalid", Addr: laddr})
}
}
netladdr = &Addr{n.Network(), port, -1}
n.mu.Lock()
defer n.mu.Unlock()
......@@ -218,15 +223,18 @@ func (l *listener) Accept() (net.Conn, error) {
// Dial dials address on the network
// It tries to connect to Accept called on listener corresponding to addr.
func (n *Network) Dial(ctx context.Context, addr string) (net.Conn, error) {
var netaddr net.Addr
derr := func(err error) error {
return &net.OpError{Op: "dial", Net: n.Network(), Addr: &Addr{n.Network(), addr}, Err: err}
return &net.OpError{Op: "dial", Net: n.Network(), Addr: netaddr, Err: err}
}
port, err := strconv.Atoi(addr)
if err != nil || port < 0 {
return nil, derr(errBadAddress) // XXX -> net.AddrError ?
return nil, derr(&net.AddrError{Err: "invalid", Addr: addr})
}
netaddr = &Addr{n.Network(), port, -1}
n.mu.Lock()
if port >= len(n.entryv) {
......@@ -284,7 +292,7 @@ func (c *conn) Close() (err error) {
// whether pipe endpoint was created via Dial or Accept.
func (c *conn) LocalAddr() net.Addr {
addr := c.entry.addr()
addr.addr += string("cs"[c.endpoint])
addr.Endpoint = c.endpoint
return addr
}
......@@ -292,7 +300,7 @@ func (c *conn) LocalAddr() net.Addr {
// it is entry address + "c" or "s" suffix -- see LocalAddr for details
func (c *conn) RemoteAddr() net.Addr {
addr := c.entry.addr()
addr.addr += string("sc"[c.endpoint])
addr.Endpoint = (c.endpoint + 1) % 2
return addr
}
......@@ -328,11 +336,17 @@ func (e *entry) empty() bool {
// addr returns address corresponding to entry
func (e *entry) addr() *Addr {
return &Addr{network: e.network.Network(), addr: fmt.Sprintf("%d", e.port)}
return &Addr{Net: e.network.Network(), Port: e.port, Endpoint: -1}
}
func (a *Addr) Network() string { return a.network }
func (a *Addr) String() string { return a.addr }
func (a *Addr) Network() string { return a.Net }
func (a *Addr) String() string {
addr := strconv.Itoa(a.Port)
if a.Endpoint >= 0 {
addr += string("cs"[a.Endpoint])
}
return addr
}
// Addr returns address where listener is accepting incoming connections
func (l *listener) Addr() net.Addr {
......
......@@ -88,26 +88,39 @@ func assertEq(t *testing.T, a, b interface{}) {
func TestPipeNet(t *testing.T) {
pnet := New("α")
addrtestv := []struct {port, endpoint int; want string} {
{0, -1, "0"},
{1, 0, "1c"},
{2, 1, "2s"},
}
for _, tt := range addrtestv {
addr := &Addr{Net: "pipeβ", Port: tt.port, Endpoint: tt.endpoint}
have := addr.String()
if have != tt.want {
t.Errorf("%#v -> %q ; want %q", addr, have, tt.want)
}
}
_, err := pnet.Dial(context.Background(), "0")
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipeα", Addr: &Addr{"pipeα", "0"}, Err: errConnRefused})
assertEq(t, err, &net.OpError{Op: "dial", Net: "pipeα", Addr: &Addr{"pipeα", 0, -1}, Err: errConnRefused})
l1 := xlisten(pnet, "")
assertEq(t, l1.Addr(), &Addr{"pipeα", "0"})
assertEq(t, l1.Addr(), &Addr{"pipeα", 0, -1})
// XXX -> use workGroup (in connection_test.go)
wg := &errgroup.Group{}
wg.Go(func() error {
return exc.Runx(func() {
c1s := xaccept(l1)
assertEq(t, c1s.LocalAddr(), &Addr{"pipeα", "1s"})
assertEq(t, c1s.RemoteAddr(), &Addr{"pipeα", "1c"})
assertEq(t, c1s.LocalAddr(), &Addr{"pipeα", 1, 1})
assertEq(t, c1s.RemoteAddr(), &Addr{"pipeα", 1, 0})
assertEq(t, xread(c1s), "ping")
xwrite(c1s, "pong")
c2s := xaccept(l1)
assertEq(t, c2s.LocalAddr(), &Addr{"pipeα", "2s"})
assertEq(t, c2s.RemoteAddr(), &Addr{"pipeα", "2c"})
assertEq(t, c2s.LocalAddr(), &Addr{"pipeα", 2, 1})
assertEq(t, c2s.RemoteAddr(), &Addr{"pipeα", 2, 0})
assertEq(t, xread(c2s), "hello")
xwrite(c2s, "world")
......@@ -115,15 +128,15 @@ func TestPipeNet(t *testing.T) {
})
c1c := xdial(pnet, "0")
assertEq(t, c1c.LocalAddr(), &Addr{"pipeα", "1c"})
assertEq(t, c1c.RemoteAddr(), &Addr{"pipeα", "1s"})
assertEq(t, c1c.LocalAddr(), &Addr{"pipeα", 1, 0})
assertEq(t, c1c.RemoteAddr(), &Addr{"pipeα", 1, 1})
xwrite(c1c, "ping")
assertEq(t, xread(c1c), "pong")
c2c := xdial(pnet, "0")
assertEq(t, c2c.LocalAddr(), &Addr{"pipeα", "2c"})
assertEq(t, c2c.RemoteAddr(), &Addr{"pipeα", "2s"})
assertEq(t, c2c.LocalAddr(), &Addr{"pipeα", 2, 0})
assertEq(t, c2c.RemoteAddr(), &Addr{"pipeα", 2, 1})
xwrite(c2c, "hello")
assertEq(t, xread(c2c), "world")
......@@ -131,5 +144,5 @@ func TestPipeNet(t *testing.T) {
xwait(wg)
l2 := xlisten(pnet, "")
assertEq(t, l2.Addr(), &Addr{"pipeα", "3"})
assertEq(t, l2.Addr(), &Addr{"pipeα", 3, -1})
}
// Copyright (C) 2017 Nexedi SA and Contributors.
// Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Open Source Initiative approved licenses and Convey
// the resulting work. Corresponding source of such a combination shall include
// the source code for all other software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// Package xtesting provides addons to std package testing
package xtesting
import (
"fmt"
"reflect"
"strings"
"testing"
)
// TODO tests for this
// XXX Tracer interface {Trace1} ?
// SyncTracer provides base infrastructure for synchronous tracing
//
// Tracing events from several sources could be collected and sent for consumption via 1 channel.
// For each event the goroutine which produced it will wait for ack before continue.
type SyncTracer struct {
tracech chan *SyncTraceMsg
}
// SyncTraceMsg represents message with 1 synchronous tracing communication
// the goroutine which produced the message will wait for send on Ack before continue.
type SyncTraceMsg struct {
Event interface {}
Ack chan<- struct{}
}
// XXX doc
func NewSyncTracer() *SyncTracer {
return &SyncTracer{tracech: make(chan *SyncTraceMsg)}
}
// Trace1 sends message with 1 tracing event to a consumer and waits for ack
func (st *SyncTracer) Trace1(event interface{}) {
ack := make(chan struct{})
st.tracech <- &SyncTraceMsg{event, ack}
<-ack
}
// Get1 receives message with 1 tracing event from a producer
// The consumer, after dealing with the message, must send back an ack.
func (st *SyncTracer) Get1() *SyncTraceMsg {
return <-st.tracech
}
// TraceChecker synchronously collects and checks tracing events from a SyncTracer
type TraceChecker struct {
t *testing.T
st *SyncTracer
}
// XXX doc
func NewTraceChecker(t *testing.T, st *SyncTracer) *TraceChecker {
return &TraceChecker{t: t, st: st}
}
// get1 gets 1 event in place and checks it has expected type
// if checks do not pass - fatal testing error is raised
// XXX merge back to expect1 ?
func (tc *TraceChecker) xget1(eventp interface{}) *SyncTraceMsg {
tc.t.Helper()
msg := tc.st.Get1()
reventp := reflect.ValueOf(eventp)
if reventp.Type().Elem() != reflect.TypeOf(msg.Event) {
tc.t.Fatalf("expect: %s: got %#v", reventp.Elem().Type(), msg.Event)
}
// *eventp = msg.Event
reventp.Elem().Set(reflect.ValueOf(msg.Event))
return msg
}
// expect1 asks checker to expect next event to be eventExpect (both type and value)
// if checks do not pass - fatal testing error is raised
// XXX merge back to expect?
func (tc *TraceChecker) expect1(eventExpect interface{}) {
tc.t.Helper()
reventExpect := reflect.ValueOf(eventExpect)
reventp := reflect.New(reventExpect.Type())
msg := tc.xget1(reventp.Interface())
revent := reventp.Elem()
if !reflect.DeepEqual(revent.Interface(), reventExpect.Interface()) {
tc.t.Fatalf("expect: %s:\nhave: %v\nwant: %v", reventExpect.Type(), revent, reventExpect)
}
close(msg.Ack)
}
// Expect asks checker to expect next series of events to be from eventExpectV in specified order
func (tc *TraceChecker) Expect(eventExpectV ...interface{}) {
tc.t.Helper()
for _, eventExpect := range eventExpectV {
tc.expect1(eventExpect)
}
}
// ExpectPar asks checker to expect next series of events to be from eventExpectV in no particular order
// XXX naming
func (tc *TraceChecker) ExpectPar(eventExpectV ...interface{}) {
tc.t.Helper()
loop:
for len(eventExpectV) > 0 {
msg := tc.st.Get1()
for i, eventExpect := range eventExpectV {
if !reflect.DeepEqual(msg.Event, eventExpect) {
continue
}
// found matching event - good
eventExpectV = append(eventExpectV[:i], eventExpectV[i+1:]...)
close(msg.Ack)
continue loop
}
// matching event not found - bad
strv := []string{}
for _, e := range eventExpectV {
strv = append(strv, fmt.Sprintf("%v", e))
}
tc.t.Fatalf("expect: have: %v\nwant: [%v]", msg.Event, strings.Join(strv, " | "))
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment