Commit c895386c authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c2db00b1
...@@ -262,7 +262,9 @@ func (c *Client) flushEventq0() { ...@@ -262,7 +262,9 @@ func (c *Client) flushEventq0() {
// Sync implements zodb.IStorageDriver. // Sync implements zodb.IStorageDriver.
func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) { func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
ctx = taskctx.Runningf(ctx, "%s: zsync", c.node.MyInfo.NID) // XXX mynid locking c.WithState/*XXX Snapshot ?*/(func(s *xneo.State) {
ctx = taskctx.Runningf(ctx, "%s: zsync", s.MyNID)
})
if glog.V(2) { if glog.V(2) {
task.TraceBegin(ctx) task.TraceBegin(ctx)
defer func() { task.TraceEnd(ctx, err) }() defer func() { task.TraceEnd(ctx, err) }()
...@@ -273,7 +275,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) { ...@@ -273,7 +275,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
} }
}() }()
err = c.node.WithOperationalState(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error { err = c.node.WithOperationalState(ctx, func(mlink *neonet.NodeLink, _ *xneo.State) error {
// XXX mlink can become down while we are making the call. // XXX mlink can become down while we are making the call.
// XXX do we want to return error or retry? // XXX do we want to return error or retry?
reply := proto.AnswerLastTransaction{} reply := proto.AnswerLastTransaction{}
...@@ -289,7 +291,9 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) { ...@@ -289,7 +291,9 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
// Load implements zodb.IStorageDriver. // Load implements zodb.IStorageDriver.
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) { func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
ctx = taskctx.Runningf(ctx, "%s: zload %s", c.node.MyInfo.NID, xid) // XXX mynid locking c.WithState(func(s *xneo.State) {
ctx = taskctx.Runningf(ctx, "%s: zload %s", s.MyNID, xid)
})
if glog.V(2) { if glog.V(2) {
task.TraceBegin(ctx) task.TraceBegin(ctx)
defer func() { task.TraceEnd(ctx, err) }() defer func() { task.TraceEnd(ctx, err) }()
...@@ -302,10 +306,10 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -302,10 +306,10 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
// Retrieve storages we might need to access. // Retrieve storages we might need to access.
storv := make([]*xneo.PeerNode, 0, 1) storv := make([]*xneo.PeerNode, 0, 1)
err = c.node.WithOperationalState(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error { err = c.node.WithOperationalState(ctx, func(mlink *neonet.NodeLink, s *xneo.State) error {
for _, cell := range cs.PartTab.Get(xid.Oid) { for _, cell := range s.PartTab.Get(xid.Oid) {
if cell.Readable() { if cell.Readable() {
stor := cs.NodeTab.Get(cell.NID) stor := s.NodeTab.Get(cell.NID)
// this storage might not yet come up // this storage might not yet come up
if stor != nil && stor.State == proto.RUNNING { if stor != nil && stor.State == proto.RUNNING {
storv = append(storv, stor) storv = append(storv, stor)
......
...@@ -154,7 +154,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -154,7 +154,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
return xio.WithCloseOnRetCancel(ctx, mlink, func() (err error) { return xio.WithCloseOnRetCancel(ctx, mlink, func() (err error) {
if accept.YourNID != node.MyInfo.NID { if accept.YourNID != node.MyInfo.NID {
log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID) log.Infof(ctx, "master %s told us to be %s", accept.MyNID, accept.YourNID)
node.MyInfo.NID = accept.YourNID node.MyInfo.NID = accept.YourNID // XXX .updateState
} }
// TODO verify Mnid = M*; our nid corresponds to our type // TODO verify Mnid = M*; our nid corresponds to our type
...@@ -187,7 +187,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -187,7 +187,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
} }
// update cluster state // update cluster state
node.updateOperational(func() { node.updateOperational(func() { // XXX -> updateState
err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown err = node.updateNodeTab(ctx, &mnt) // the only err is cmdShutdown
node.State.PartTab = pt node.State.PartTab = pt
if err == nil { if err == nil {
...@@ -391,7 +391,7 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -391,7 +391,7 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
// TODO .State = DOWN -> ResetLink // TODO .State = DOWN -> ResetLink
if nodeInfo.NID == node.MyInfo.NID { if nodeInfo.NID == node.MyInfo.NID {
// XXX recheck locking // XXX recheck locking -> under .updateState
node.MyInfo = nodeInfo node.MyInfo = nodeInfo
// NEO/py currently employs this hack // NEO/py currently employs this hack
......
...@@ -31,7 +31,7 @@ import ( ...@@ -31,7 +31,7 @@ import (
// NodeState represent state maintained on a Node. // NodeState represent state maintained on a Node.
type NodeState struct { type NodeState struct {
NID proto.NodeID // nid received by node from M NID proto.NodeID // nid received by node from M XXX -> MasteredState ? (because .mlink is there ?)
IdTime proto.IdTime // when this node last identified to master IdTime proto.IdTime // when this node last identified to master
NodeTab *NodeTable // nodes in the cluster NodeTab *NodeTable // nodes in the cluster
...@@ -105,7 +105,7 @@ func (cs *ClusterState) Snapshot() *ClusterStateSnapshot { ...@@ -105,7 +105,7 @@ func (cs *ClusterState) Snapshot() *ClusterStateSnapshot {
// - current cluster state. // - current cluster state.
type Node struct { type Node struct {
// MyInfo proto.NodeInfo // type, laddr, nid, state, idtime XXX nid is managed by M // MyInfo proto.NodeInfo // type, laddr, nid, state, idtime XXX nid is managed by M
MyState NodeState State NodeState
// XXX type, laddr // XXX type, laddr
ClusterName string ClusterName string
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
...@@ -122,22 +122,19 @@ type Node struct { ...@@ -122,22 +122,19 @@ type Node struct {
// NewNode creates new node. // NewNode creates new node.
func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *Node { func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *Node {
node := &Node{ node := &Node{
MyInfo: proto.NodeInfo{ State {
Type: typ,
Addr: proto.Address{},
NID: proto.NID(typ, 0), // temp, e.g. S? TODO use "temp" bit in NodeID NID: proto.NID(typ, 0), // temp, e.g. S? TODO use "temp" bit in NodeID
IdTime: proto.IdTimeNone, IdTime: proto.IdTimeNone,
},
NodeTab: &NodeTable{},
PartTab: &PartitionTable{},
ClusterState: -1, // invalid
}
ClusterName: clusterName, ClusterName: clusterName,
Net: net, Net: net,
MasterAddr: masterAddr, MasterAddr: masterAddr,
State: ClusterState{
NodeTab: &NodeTable{},
PartTab: &PartitionTable{},
Code: -1, // invalid
},
} }
node.State.NodeTab.localNode = node node.State.NodeTab.localNode = node
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment