Commit 0a7b04d6 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d9882191
...@@ -262,7 +262,7 @@ func (c *Client) flushEventq0() { ...@@ -262,7 +262,7 @@ func (c *Client) flushEventq0() {
// Sync implements zodb.IStorageDriver. // Sync implements zodb.IStorageDriver.
func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) { func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
c.WithState/*XXX Snapshot ?*/(func(s *xneo.State) { c.node.WithState(func(s *xneo.NodeState) {
ctx = taskctx.Runningf(ctx, "%s: zsync", s.MyNID) ctx = taskctx.Runningf(ctx, "%s: zsync", s.MyNID)
}) })
if glog.V(2) { if glog.V(2) {
...@@ -291,7 +291,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) { ...@@ -291,7 +291,7 @@ func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
// Load implements zodb.IStorageDriver. // Load implements zodb.IStorageDriver.
func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) { func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial zodb.Tid, err error) {
c.WithState(func(s *xneo.State) { c.node.WithState(func(s *xneo.State) {
ctx = taskctx.Runningf(ctx, "%s: zload %s", s.MyNID, xid) ctx = taskctx.Runningf(ctx, "%s: zload %s", s.MyNID, xid)
}) })
if glog.V(2) { if glog.V(2) {
......
...@@ -341,17 +341,36 @@ func (node *_MasteredNode) updateOperational(δf func()) { ...@@ -341,17 +341,36 @@ func (node *_MasteredNode) updateOperational(δf func()) {
} }
} }
/*
// WithState runs f with the guarantee that cluster state is not changed during the run. // WithState runs f with the guarantee that cluster state is not changed during the run.
func (node *_MasteredNode) WithState(f func(cs *xneo.ClusterState) error) { func (node *_MasteredNode) WithState(f func(cs *xneo.ClusterState) error) {
node.opMu.RLock() node.opMu.RLock()
defer node.opMu.RUnlock() defer node.opMu.RUnlock()
return f(&node.State) return f(&node.State)
} }
*/
// WithOperationalState runs f during when cluster state is/becomes operational. // WithOperationalState runs f during when cluster state is/becomes operational.
// The cluster state is guaranteed not to change during f run. // The cluster state is guaranteed not to change during f run.
func (node *_MasteredNode) WithOperationalState(ctx context.Context, f func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error) error { func (node *_MasteredNode) WithOperationalState(ctx context.Context, f func(mlink *neonet.NodeLink, s *xneo.State) error) error {
for { for {
done := false
node.WithState(func(s *xneo.State) {
if !s.operational {
ready = node.opReady
return
}
//fmt.Printf("withOperation -> ready\n");
done = true
err = f(/*XXX -> s.mlink ?*/node.mlink, s)
})
if done {
return err
}
/*
node.opMu.RLock() node.opMu.RLock()
if node.operational { if node.operational {
//fmt.Printf("withOperation -> ready\n"); //fmt.Printf("withOperation -> ready\n");
...@@ -360,9 +379,9 @@ func (node *_MasteredNode) WithOperationalState(ctx context.Context, f func(mlin ...@@ -360,9 +379,9 @@ func (node *_MasteredNode) WithOperationalState(ctx context.Context, f func(mlin
ready := node.opReady ready := node.opReady
node.opMu.RUnlock() node.opMu.RUnlock()
*/
//fmt.Printf("withOperational - waiting on %v\n", ready) //fmt.Printf("withOperational - waiting on %v\n", ready)
select { select {
case <-ctx.Done(): case <-ctx.Done():
return fmt.Errorf("wait operational: %w", ctx.Err()) return fmt.Errorf("wait operational: %w", ctx.Err())
...@@ -372,9 +391,11 @@ func (node *_MasteredNode) WithOperationalState(ctx context.Context, f func(mlin ...@@ -372,9 +391,11 @@ func (node *_MasteredNode) WithOperationalState(ctx context.Context, f func(mlin
} }
} }
/*
// node.operational=y and node.opMu is rlocked // node.operational=y and node.opMu is rlocked
defer node.opMu.RUnlock() defer node.opMu.RUnlock()
return f(node.mlink, &node.State) return f(node.mlink, &node.State)
*/
} }
......
...@@ -153,6 +153,7 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA ...@@ -153,6 +153,7 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA
// XXX doc, naming // XXX doc, naming
// XXX WithStateHead ? WithStateSnapshot ?
func (node *Node) WithState(f func(*/*readonly*/State)) { func (node *Node) WithState(f func(*/*readonly*/State)) {
node.stateLogMu.Lock() node.stateLogMu.Lock()
s := node.state s := node.state
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment