Commit 5adc637c authored by Kirill Smelkov's avatar Kirill Smelkov

X clusterState tracing + check

parent 30dfd90f
...@@ -688,7 +688,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) { ...@@ -688,7 +688,7 @@ func (nl *NodeLink) recvPkt() (*PktBuf, error) {
} }
} }
if dumpio { if /* XXX temp show only tx */ false && dumpio {
// XXX -> log // XXX -> log
fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt) fmt.Printf("%v < %v: %v\n", nl.peerLink.LocalAddr(), nl.peerLink.RemoteAddr(), pkt)
} }
......
//go:generate stringer -output zproto-str.go -type ErrorCode,NodeType proto.go //go:generate stringer -output zproto-str.go -type ErrorCode,NodeType proto.go
package neo package neo
// supporting code for types defined in proto.go
import ( import (
"fmt" "fmt"
...@@ -20,6 +21,13 @@ func (e *Error) Error() string { ...@@ -20,6 +21,13 @@ func (e *Error) Error() string {
} }
// Set sets cluster state value to v.
// Use Set instead of direct assignment for ClusterState tracing to work.
func (cs *ClusterState) Set(v ClusterState) {
*cs = v
traceClusterStateChanged(cs)
}
const nodeTypeChar = "MSCA4567" // keep in sync with NodeType constants const nodeTypeChar = "MSCA4567" // keep in sync with NodeType constants
// String returns string representation of a node uuid. // String returns string representation of a node uuid.
...@@ -46,7 +54,7 @@ func (nodeUUID NodeUUID) String() string { ...@@ -46,7 +54,7 @@ func (nodeUUID NodeUUID) String() string {
return s return s
} }
// XXX place ok? // UUID creates node uuid from node type and number.
// XXX test // XXX test
func UUID(typ NodeType, num int32) NodeUUID { func UUID(typ NodeType, num int32) NodeUUID {
temp := uint32(0) temp := uint32(0)
......
...@@ -51,6 +51,8 @@ const ( ...@@ -51,6 +51,8 @@ const (
INCOMPLETE_TRANSACTION INCOMPLETE_TRANSACTION
) )
//trace:event traceClusterStateChanged(cs *ClusterState)
type ClusterState int32 type ClusterState int32
const ( const (
// Once the primary master is elected, the cluster has a state, which is // Once the primary master is elected, the cluster has a state, which is
......
...@@ -71,19 +71,28 @@ func (t *MyTracer) TraceNetConnect(ev *xnet.TraceConnect) { t.Trace1(ev) } ...@@ -71,19 +71,28 @@ func (t *MyTracer) TraceNetConnect(ev *xnet.TraceConnect) { t.Trace1(ev) }
func (t *MyTracer) TraceNetListen(ev *xnet.TraceListen) { t.Trace1(ev) } func (t *MyTracer) TraceNetListen(ev *xnet.TraceListen) { t.Trace1(ev) }
func (t *MyTracer) TraceNetTx(ev *xnet.TraceTx) {} // { t.Trace1(ev) } func (t *MyTracer) TraceNetTx(ev *xnet.TraceTx) {} // { t.Trace1(ev) }
type traceNeoRecv struct {conn *neo.Conn; msg neo.Msg} //type traceNeoRecv struct {conn *neo.Conn; msg neo.Msg}
func (t *MyTracer) traceNeoConnRecv(c *neo.Conn, msg neo.Msg) { t.Trace1(&traceNeoRecv{c, msg}) } //func (t *MyTracer) traceNeoConnRecv(c *neo.Conn, msg neo.Msg) { t.Trace1(&traceNeoRecv{c, msg}) }
type traceNeoSend struct { type traceNeoSend struct {
Src, Dst net.Addr Src, Dst net.Addr
ConnID uint32 ConnID uint32
Msg neo.Msg Msg neo.Msg
} }
func (t *MyTracer) traceNeoConnSendPre(c *neo.Conn, msg neo.Msg) { func (t *MyTracer) traceNeoConnSendPre(c *neo.Conn, msg neo.Msg) {
t.Trace1(&traceNeoSend{c.Link().LocalAddr(), c.Link().RemoteAddr(), c.ConnID(), msg}) t.Trace1(&traceNeoSend{c.Link().LocalAddr(), c.Link().RemoteAddr(), c.ConnID(), msg})
} }
type traceNeoClusterState struct {
Ptr *neo.ClusterState // pointer to variable which holds the state
State neo.ClusterState
}
func (t *MyTracer) traceNeoClusterState(cs *neo.ClusterState) {
t.Trace1(&traceNeoClusterState{cs, *cs})
}
func clusterState(cs *neo.ClusterState, v neo.ClusterState) *traceNeoClusterState {
return &traceNeoClusterState{cs, v}
}
/* /*
func (tc *TraceChecker) ExpectNetDial(dst string) { func (tc *TraceChecker) ExpectNetDial(dst string) {
...@@ -146,6 +155,7 @@ func TestMasterStorage(t *testing.T) { ...@@ -146,6 +155,7 @@ func TestMasterStorage(t *testing.T) {
tracing.Lock() tracing.Lock()
//neo_traceConnRecv_Attach(pg, tracer.traceNeoConnRecv) //neo_traceConnRecv_Attach(pg, tracer.traceNeoConnRecv)
neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre) neo_traceConnSendPre_Attach(pg, tracer.traceNeoConnSendPre)
neo_traceClusterStateChanged_Attach(pg, tracer.traceNeoClusterState)
tracing.Unlock() tracing.Unlock()
...@@ -172,11 +182,12 @@ func TestMasterStorage(t *testing.T) { ...@@ -172,11 +182,12 @@ func TestMasterStorage(t *testing.T) {
return &xnet.TraceListen{Laddr: xaddr(laddr)} return &xnet.TraceListen{Laddr: xaddr(laddr)}
} }
// XXX // shortcut for net tx event over nodelink connection
conntx := func(src, dst string, connid uint32, msg neo.Msg) *traceNeoSend { conntx := func(src, dst string, connid uint32, msg neo.Msg) *traceNeoSend {
return &traceNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg} return &traceNeoSend{Src: xaddr(src), Dst: xaddr(dst), ConnID: connid, Msg: msg}
} }
Mhost := xnet.NetTrace(net.Host("m"), tracer) Mhost := xnet.NetTrace(net.Host("m"), tracer)
Shost := xnet.NetTrace(net.Host("s"), tracer) Shost := xnet.NetTrace(net.Host("s"), tracer)
...@@ -191,11 +202,12 @@ func TestMasterStorage(t *testing.T) { ...@@ -191,11 +202,12 @@ func TestMasterStorage(t *testing.T) {
_ = err // XXX _ = err // XXX
}) })
// expect: // expect:
tc.Expect(netlisten("m:1")) tc.Expect(netlisten("m:1"))
// M.clusterState <- RECOVERY // XXX M.nodeTab <- Node(M)
// M.nodeTab <- Node(M) tc.Expect(clusterState(&M.clusterState, neo.ClusterRecovering))
// start storage // start storage
zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs") zstor := xfs1stor("../../zodb/storage/fs1/testdata/1.fs")
...@@ -224,7 +236,9 @@ func TestMasterStorage(t *testing.T) { ...@@ -224,7 +236,9 @@ func TestMasterStorage(t *testing.T) {
ClusterName: "abc1", ClusterName: "abc1",
IdTimestamp: 0, IdTimestamp: 0,
})) }))
// TODO check M adjust nodetab...
// XXX M.nodeTab <- Node(S1)
tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{ tc.Expect(conntx("m:2", "s:2", 1, &neo.AcceptIdentification{
NodeType: neo.MASTER, NodeType: neo.MASTER,
MyNodeUUID: neo.UUID(neo.MASTER, 1), MyNodeUUID: neo.UUID(neo.MASTER, 1),
...@@ -250,22 +264,12 @@ func TestMasterStorage(t *testing.T) { ...@@ -250,22 +264,12 @@ func TestMasterStorage(t *testing.T) {
RowList: []neo.RowInfo{}, RowList: []neo.RowInfo{},
})) }))
// XXX M.partTab <- ...
// XXX updated something cluster currently can be operational
// XXX temp // XXX temp
return return
// M.nodeTab <- Node(S) XXX order can be racy?
// M -> S .? AcceptIdentification{...}
// S.nodeTab <- Node(M) XXX order can be racy?
//
// ; storCtlRecovery
// M -> S .? Recovery
// S <- M .? AnswerRecovery
//
// M -> S .? AskPartitionTable
// S <- M .? AnswerPartitionTable
// M.partTab <- ... XXX
// XXX updated something cluster currently can be operational
err := M.Start() err := M.Start()
exc.Raiseif(err) exc.Raiseif(err)
......
...@@ -107,10 +107,7 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master { ...@@ -107,10 +107,7 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
nodeLeave: make(chan nodeLeave), nodeLeave: make(chan nodeLeave),
} }
m.node.MyInfo.NodeUUID = m.allocUUID(neo.MASTER) m.clusterState = -1 // invalid
// TODO update nodeTab with self
m.clusterState = neo.ClusterRecovering // XXX no elections - we are the only master
return m return m
} }
...@@ -142,7 +139,7 @@ func (m *Master) Shutdown() error { ...@@ -142,7 +139,7 @@ func (m *Master) Shutdown() error {
// setClusterState sets .clusterState and notifies subscribers // setClusterState sets .clusterState and notifies subscribers
func (m *Master) setClusterState(state neo.ClusterState) { func (m *Master) setClusterState(state neo.ClusterState) {
m.clusterState = state m.clusterState.Set(state)
// TODO notify subscribers // TODO notify subscribers
} }
...@@ -150,6 +147,9 @@ func (m *Master) setClusterState(state neo.ClusterState) { ...@@ -150,6 +147,9 @@ func (m *Master) setClusterState(state neo.ClusterState) {
// Run starts master node and runs it until ctx is cancelled or fatal error // Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) (err error) { func (m *Master) Run(ctx context.Context) (err error) {
m.node.MyInfo.NodeUUID = m.allocUUID(neo.MASTER)
// TODO update nodeTab with self
// start listening // start listening
l, err := m.node.Listen() l, err := m.node.Listen()
if err != nil { if err != nil {
......
...@@ -353,6 +353,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err ...@@ -353,6 +353,7 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
} }
// XXX move Mconn.Send here and ^^^ only prepare reply?
if err != nil { if err != nil {
return err return err
} }
......
...@@ -13,11 +13,14 @@ import ( ...@@ -13,11 +13,14 @@ import (
// traceimport: "lab.nexedi.com/kirr/neo/go/neo" // traceimport: "lab.nexedi.com/kirr/neo/go/neo"
// rerun "gotrace gen" if you see link failure ↓↓↓ // rerun "gotrace gen" if you see link failure ↓↓↓
//go:linkname neo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo._trace_exporthash_d2fa0ebb37c3e2bf54309859a1eeb0e831edd435 //go:linkname neo_trace_exporthash lab.nexedi.com/kirr/neo/go/neo._trace_exporthash_46f45c4a2306b317d62d3cded6f5ec228f0cf669
func neo_trace_exporthash() func neo_trace_exporthash()
func init() { neo_trace_exporthash() } func init() { neo_trace_exporthash() }
//go:linkname neo_traceClusterStateChanged_Attach lab.nexedi.com/kirr/neo/go/neo.traceClusterStateChanged_Attach
func neo_traceClusterStateChanged_Attach(*tracing.ProbeGroup, func(cs *neo.ClusterState)) *tracing.Probe
//go:linkname neo_traceConnRecv_Attach lab.nexedi.com/kirr/neo/go/neo.traceConnRecv_Attach //go:linkname neo_traceConnRecv_Attach lab.nexedi.com/kirr/neo/go/neo.traceConnRecv_Attach
func neo_traceConnRecv_Attach(*tracing.ProbeGroup, func(c *neo.Conn, msg neo.Msg)) *tracing.Probe func neo_traceConnRecv_Attach(*tracing.ProbeGroup, func(c *neo.Conn, msg neo.Msg)) *tracing.Probe
......
...@@ -8,6 +8,33 @@ import ( ...@@ -8,6 +8,33 @@ import (
"unsafe" "unsafe"
) )
// traceevent: traceClusterStateChanged(cs *ClusterState)
type _t_traceClusterStateChanged struct {
tracing.Probe
probefunc func(cs *ClusterState)
}
var _traceClusterStateChanged *_t_traceClusterStateChanged
func traceClusterStateChanged(cs *ClusterState) {
if _traceClusterStateChanged != nil {
_traceClusterStateChanged_run(cs)
}
}
func _traceClusterStateChanged_run(cs *ClusterState) {
for p := _traceClusterStateChanged; p != nil; p = (*_t_traceClusterStateChanged)(unsafe.Pointer(p.Next())) {
p.probefunc(cs)
}
}
func traceClusterStateChanged_Attach(pg *tracing.ProbeGroup, probe func(cs *ClusterState)) *tracing.Probe {
p := _t_traceClusterStateChanged{probefunc: probe}
tracing.AttachProbe(pg, (**tracing.Probe)(unsafe.Pointer(&_traceClusterStateChanged)), &p.Probe)
return &p.Probe
}
// traceevent: traceConnRecv(c *Conn, msg Msg) // traceevent: traceConnRecv(c *Conn, msg Msg)
type _t_traceConnRecv struct { type _t_traceConnRecv struct {
...@@ -63,4 +90,4 @@ func traceConnSendPre_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg Msg ...@@ -63,4 +90,4 @@ func traceConnSendPre_Attach(pg *tracing.ProbeGroup, probe func(c *Conn, msg Msg
} }
// trace export signature // trace export signature
func _trace_exporthash_d2fa0ebb37c3e2bf54309859a1eeb0e831edd435() {} func _trace_exporthash_46f45c4a2306b317d62d3cded6f5ec228f0cf669() {}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment