Commit 9a2b15fc authored by Kirill Smelkov's avatar Kirill Smelkov

xnet: Teach Tracer to enable/disable delivery of trace events at runtime.

Some NEO/go tests need to start a cluster under tracing and then to
continue running test workload with tracing disabled.
parent 39c1e8c9
...@@ -23,6 +23,7 @@ package xnet ...@@ -23,6 +23,7 @@ package xnet
import ( import (
"context" "context"
"net" "net"
"sync/atomic"
) )
// NetTrace wraps underlying networker with IO tracing layer. // NetTrace wraps underlying networker with IO tracing layer.
...@@ -42,7 +43,7 @@ import ( ...@@ -42,7 +43,7 @@ import (
// //
// WARNING NetTrace functionality is currently very draft. // WARNING NetTrace functionality is currently very draft.
func NetTrace(inner Networker, tracerx TraceReceiver) *Tracer { func NetTrace(inner Networker, tracerx TraceReceiver) *Tracer {
return &Tracer{inner, tracerx} return &Tracer{inner, tracerx, 1}
} }
// TraceReceiver is the interface that needs to be implemented by network trace receivers. // TraceReceiver is the interface that needs to be implemented by network trace receivers.
...@@ -86,6 +87,21 @@ type TraceTx struct { ...@@ -86,6 +87,21 @@ type TraceTx struct {
type Tracer struct { type Tracer struct {
inner Networker inner Networker
rx TraceReceiver rx TraceReceiver
on int32 // atomic (tracing can be enabled/disabled at runtime)
}
// TraceOn tells the tracer to (re)enable delivery of trace events.
func (t *Tracer) TraceOn() {
atomic.StoreInt32(&t.on, 1)
}
// TraceOff tells tracer to disable delivery of trace events.
func (t *Tracer) TraceOff() {
atomic.StoreInt32(&t.on, 0)
}
func (t *Tracer) enabled() bool {
return (atomic.LoadInt32(&t.on) != 0)
} }
// Network implements Networker. // Network implements Networker.
...@@ -106,12 +122,16 @@ func (t *Tracer) Close() error { ...@@ -106,12 +122,16 @@ func (t *Tracer) Close() error {
// Dial implements Networker. // Dial implements Networker.
func (t *Tracer) Dial(ctx context.Context, addr string) (net.Conn, error) { func (t *Tracer) Dial(ctx context.Context, addr string) (net.Conn, error) {
if t.enabled() {
t.rx.TraceNetDial(&TraceDial{Dialer: t.inner.Name(), Addr: addr}) t.rx.TraceNetDial(&TraceDial{Dialer: t.inner.Name(), Addr: addr})
}
c, err := t.inner.Dial(ctx, addr) c, err := t.inner.Dial(ctx, addr)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if t.enabled() {
t.rx.TraceNetConnect(&TraceConnect{Src: c.LocalAddr(), Dst: c.RemoteAddr(), Dialed: addr}) t.rx.TraceNetConnect(&TraceConnect{Src: c.LocalAddr(), Dst: c.RemoteAddr(), Dialed: addr})
}
return &traceConn{t, c}, nil return &traceConn{t, c}, nil
} }
...@@ -122,7 +142,9 @@ func (t *Tracer) Listen(ctx context.Context, laddr string) (Listener, error) { ...@@ -122,7 +142,9 @@ func (t *Tracer) Listen(ctx context.Context, laddr string) (Listener, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
if t.enabled() {
t.rx.TraceNetListen(&TraceListen{Laddr: l.Addr()}) t.rx.TraceNetListen(&TraceListen{Laddr: l.Addr()})
}
return &netTraceListener{t, l}, nil return &netTraceListener{t, l}, nil
} }
...@@ -150,7 +172,9 @@ func (tc *traceConn) Write(b []byte) (int, error) { ...@@ -150,7 +172,9 @@ func (tc *traceConn) Write(b []byte) (int, error) {
// XXX +TraceNetTxPre ? // XXX +TraceNetTxPre ?
n, err := tc.Conn.Write(b) n, err := tc.Conn.Write(b)
if err == nil { if err == nil {
if tc.t.enabled() {
tc.t.rx.TraceNetTx(&TraceTx{Src: tc.LocalAddr(), Dst: tc.RemoteAddr(), Pkt: b}) tc.t.rx.TraceNetTx(&TraceTx{Src: tc.LocalAddr(), Dst: tc.RemoteAddr(), Pkt: b})
} }
}
return n, err return n, err
} }
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment