Commit 74daaca9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 864e62ae
......@@ -273,7 +273,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
}
}()
err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
err = c.node.WithOperationalState(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
// XXX mlink can become down while we are making the call.
// XXX do we want to return error or retry?
reply := proto.AnswerLastTransaction{}
......@@ -302,7 +302,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
// Retrieve storages we might need to access.
storv := make([]*xneo.PeerNode, 0, 1)
err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error {
err = c.node.WithOperationalState(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error {
for _, cell := range cs.PartTab.Get(xid.Oid) {
if cell.Readable() {
stor := cs.NodeTab.Get(cell.NID)
......
......@@ -1247,7 +1247,7 @@ func (p *_MasteredPeer) run(ctx context.Context, f func() error) error {
}
}
// accept sends accept reply and pushes inital state0 snapshot to peer.
// accept sends accept reply and pushes initial state0 snapshot to peer.
func (p *_MasteredPeer) accept(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "accept %s", p.node.NID)(&err)
......
......@@ -64,7 +64,7 @@ type _MasteredNode struct {
// being operational means:
// - link to master established and is ok
// - state of partTab/nodeTab is operational
opMu sync.RWMutex
opMu sync.RWMutex // XXX -> stateMu
mlink *neonet.NodeLink // link to master
opReady chan struct{} // reinitialized each time state becomes non-operational
operational bool // cache for Node.State.IsOperational()
......@@ -104,6 +104,8 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
//
// f is called on every reconnection to master after identification and protocol prologue.
//
// TalkMaster should be the only mutator of cluster state and .MyInfo.NID.
//
// See top-level _MasteredNode overview for details.
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// start logging with initial NID (that might be temporary, and which master can tell us to change)
......@@ -299,6 +301,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
return δpt, err
}
// XXX updateState
// updateOperational calls δf under .opMu and updates .operational from current state.
// After .opMu unlock it also notifies those who was waiting for .operational to become y.
......@@ -334,9 +337,16 @@ func (node *_MasteredNode) updateOperational(δf func()) {
}
}
// WithOperational runs f during when cluster state is/becomes operational.
// WithState runs f with the guarantee that cluster state is not changed during the run.
func (node *_MasteredNode) WithState(f func(cs *xneo.ClusterState) error) {
node.opMu.RLock()
defer node.opMu.RUnlock()
return f(&node.State)
}
// WithOperationalState runs f during when cluster state is/becomes operational.
// The cluster state is guaranteed not to change during f run.
func (node *_MasteredNode) WithOperational(ctx context.Context, f func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error) error {
func (node *_MasteredNode) WithOperationalState(ctx context.Context, f func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error) error {
for {
node.opMu.RLock()
if node.operational {
......
......@@ -60,6 +60,8 @@ import (
// NOTE once a node was added to NodeTable its entry is never deleted: if e.g.
// a connection to node is lost associated entry is marked as having DOWN (XXX
// or UNKNOWN ?) node state.
//
// NOTE the caller must care about locking when modifying or accessing NodeTab.
type NodeTable struct {
// XXX for PeerNode.Dial to work. see also comments vvv near "peer link"
localNode *Node
......@@ -75,11 +77,15 @@ type NodeTable struct {
//
// - peer's node info (nid, laddr, state, ...), and
// - link to the peer.
//
// NOTE the caller must make sure that local node state is not changed during
// access to PeerNode data and Dial.
type PeerNode struct {
nodeTab *NodeTable // we are peer of .nodeTab.localNode
proto.NodeInfo // (.type, .laddr, .nid, .state, .idtime) XXX also protect by mu?
// XXX -> *peerLink
localNode *Node // link is for peer of .localNode
linkMu sync.Mutex
link *neonet.NodeLink // link to this peer; nil if not connected
dialT time.Time // last dial finished at this time
......@@ -159,14 +165,20 @@ func (nt *NodeTable) String() string {
buf := bytes.Buffer{}
for _, n := range nt.nodev {
// XXX recheck output
// XXX +link ?
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\t@ %s\n", n.NID, n.Type, n.State, n.Addr, n.IdTime)
}
return buf.String()
}
// Clone creates new instance of NodeTable initialized with previous content.
func (nt *NodeTable) Clone() *NodeTab {
nt_ := &NodeTable{localNode: nt.localNode}
nt_.nodev = append([]*PeerNode(nil), nt.nodev)
return nt_
}
// ---- peer link ----
// TODO review peer link dialing / setting / accepting.
......
......@@ -224,7 +224,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
}
// ---- encode / decode PT to / from messages
// ---- encode / decode PT to / from messages ----
func (pt *PartitionTable) String() string {
buf := &bytes.Buffer{}
......@@ -287,3 +287,6 @@ func PartTabFromMsg(msg *proto.SendPartitionTable) *PartitionTable {
return pt
}
// ----------------------------------------
......@@ -29,6 +29,32 @@ import (
"lab.nexedi.com/kirr/neo/go/neo/proto"
)
// NodeState represent state maintained on a Node.
type NodeState struct {
NID proto.NodeID // nid received by node from M
IdTime proto.IdTime // when this node last identified to master
NodeTab *NodeTable // nodes in the cluster
PartTab *PartitionTable // on which nodes which data is located
ClusterState proto.ClusterState // code for cluster state
}
// Clone creates new instance of NodeState initialized with previous content.
func (s *NodeState) Clone() *NodeState {
return &NodeState{
NID: s.NID,
IdTime: s.IdTime,
ClusterState: s.ClusterState,
// TODO ? if cloning full nodeTab/partTab becomes slow (for large
// tables) -> change to create COW reference.
NodeTab: s.NodeTab.Clone()
PaetTab: s.PartTab.Clone()
}
}
/*
// ClusterState represent state of a cluster.
type ClusterState struct {
NodeTab *NodeTable // information about nodes in the cluster
......@@ -36,9 +62,10 @@ type ClusterState struct {
Code proto.ClusterState // master idea about cluster state
}
// XXX -> inline? (used only in 1 place)
func (cs *ClusterState) IsOperational() bool {
// XXX py client does not wait for cluster state==RUNNING
return /* cs.Code == proto.ClusterRunning && */ cs.PartTab.OperationalWith(cs.NodeTab)
return /* cs.Code == proto.ClusterRunning && * / cs.PartTab.OperationalWith(cs.NodeTab)
}
// ClusterStateSnapshot is snapshot of ClusterState.
......@@ -65,6 +92,7 @@ func (cs *ClusterState) Snapshot() *ClusterStateSnapshot {
return S
}
*/
// Node provides base functionality underlying implementation of any NEO node.
......@@ -76,7 +104,9 @@ func (cs *ClusterState) Snapshot() *ClusterStateSnapshot {
// - current partition table (how data is split around storage nodes),
// - current cluster state.
type Node struct {
MyInfo proto.NodeInfo // type, laddr, nid, state, idtime
// MyInfo proto.NodeInfo // type, laddr, nid, state, idtime XXX nid is managed by M
MyState NodeState
// XXX type, laddr
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment