Commit 24c4a165 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 373ef31b
...@@ -58,11 +58,13 @@ type Client struct { ...@@ -58,11 +58,13 @@ type Client struct {
runWG *xsync.WorkGroup runWG *xsync.WorkGroup
runCancel func() runCancel func()
/*
// link to master - established and maintained by talkMaster. // link to master - established and maintained by talkMaster.
// users retrieve it via masterLink(). // users retrieve it via masterLink().
mlinkMu sync.Mutex mlinkMu sync.Mutex
mlink *neonet.NodeLink mlink *neonet.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle mlinkReady chan struct{} // reinitialized at each new talk cycle
*/
/* /*
// operational state in node is maintained by recvMaster. // operational state in node is maintained by recvMaster.
...@@ -104,7 +106,7 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { ...@@ -104,7 +106,7 @@ func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
return &Client{ return &Client{
// node: xneo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr), // node: xneo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr),
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
mlinkReady: make(chan struct{}), // mlinkReady: make(chan struct{}),
// operational: false, // operational: false,
// opReady: make(chan struct{}), // opReady: make(chan struct{}),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
...@@ -165,6 +167,7 @@ func (c *Client) Close() (err error) { ...@@ -165,6 +167,7 @@ func (c *Client) Close() (err error) {
// --- connection with master --- // --- connection with master ---
/*
// masterLink returns link to primary master. // masterLink returns link to primary master.
// //
// NOTE that even if masterLink returns != nil, the master link can become // NOTE that even if masterLink returns != nil, the master link can become
...@@ -190,6 +193,7 @@ func (c *Client) masterLink(ctx context.Context) (*neonet.NodeLink, error) { ...@@ -190,6 +193,7 @@ func (c *Client) masterLink(ctx context.Context) (*neonet.NodeLink, error) {
} }
} }
} }
*/
/* /*
// updateOperational updates .operational from current state. // updateOperational updates .operational from current state.
...@@ -344,7 +348,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) { ...@@ -344,7 +348,7 @@ func (c *Client) talkMaster1(ctx context.Context) (err error) {
// syncMaster asks M for DB head right after identification. // syncMaster asks M for DB head right after identification.
func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) { func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "sync")(&err) defer task.Running(&ctx, "sync0")(&err) // XXX unify with Sync ?
// query last_tid // query last_tid
lastTxn := proto.AnswerLastTransaction{} lastTxn := proto.AnswerLastTransaction{}
...@@ -466,28 +470,26 @@ func (c *Client) flushEventq0() { ...@@ -466,28 +470,26 @@ func (c *Client) flushEventq0() {
// --- user API calls --- // --- user API calls ---
// Sync implements zodb.IStorageDriver. // Sync implements zodb.IStorageDriver.
func (c *Client) Sync(ctx context.Context) (_ zodb.Tid, err error) { func (c *Client) Sync(ctx context.Context) (head zodb.Tid, err error) {
defer func() { defer func() {
if err != nil { if err != nil {
err = &zodb.OpError{URL: c.URL(), Op: "sync", Args: nil, Err: err} err = &zodb.OpError{URL: c.URL(), Op: "sync", Args: nil, Err: err}
} }
}() }()
// FIXME require full withOperational ? -> err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, _ *xneo.ClusterState) error {
mlink, err := c.masterLink(ctx) // XXX mlink can become down while we are making the call.
if err != nil { // XXX do we want to return error or retry?
return 0, err reply := proto.AnswerLastTransaction{}
} err = mlink.Ask1(&proto.LastTransaction{}, &reply) // XXX Ask += ctx
if err != nil {
// XXX mlink can become down while we are making the call. // XXX ZODBErrDecode?
// XXX do we want to return error or retry? return err
reply := proto.AnswerLastTransaction{} }
err = mlink.Ask1(&proto.LastTransaction{}, &reply) // XXX Ask += ctx head = reply.Tid
if err != nil { return nil
// XXX ZODBErrDecode? })
return 0, err return
}
return reply.Tid, nil
} }
// Load implements zodb.IStorageDriver. // Load implements zodb.IStorageDriver.
...@@ -500,7 +502,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z ...@@ -500,7 +502,7 @@ func (c *Client) Load(ctx context.Context, xid zodb.Xid) (buf *mem.Buf, serial z
// Retrieve storages we might need to access. // Retrieve storages we might need to access.
storv := make([]*xneo.Node, 0, 1) storv := make([]*xneo.Node, 0, 1)
err = c.node.WithOperational(ctx, func(cs *xneo.ClusterState) error { err = c.node.WithOperational(ctx, func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error {
for _, cell := range cs.PartTab.Get(xid.Oid) { for _, cell := range cs.PartTab.Get(xid.Oid) {
if cell.Readable() { if cell.Readable() {
stor := cs.NodeTab.Get(cell.NID) stor := cs.NodeTab.Get(cell.NID)
......
...@@ -65,22 +65,17 @@ type _MasteredNode struct { ...@@ -65,22 +65,17 @@ type _MasteredNode struct {
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry MasterAddr string // address of current master TODO -> masterRegistry
// nodeTab/partTab/clusterState
stateMu sync.RWMutex
state xneo.ClusterState
// operational state in node is maintained by TalkMaster. // operational state in node is maintained by TalkMaster.
// users retrieve it via withOperational(). XXX recheck // users retrieve it via WithOperational().
// //
// NOTE being operational means: // being operational means:
// - link to master established and is ok // - link to master established and is ok
// - .state is operational // - state of partTab/nodeTab is operational
// opMu sync.RWMutex
// however master link is accessed separately (see ^^^ and masterLink) mlink *neonet.NodeLink // link to master
// state xneo.ClusterState // nodeTab/partTab/clusterState
// protected by .stateMu opReady chan struct{} // reinitialized each time state becomes non-operational
operational bool operational bool // cache for state.IsOperational()
opReady chan struct{} // reinitialized each time state becomes non-operational
flags _MasteredNodeFlags flags _MasteredNodeFlags
...@@ -124,7 +119,8 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -124,7 +119,8 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
Code: -1, // invalid Code: -1, // invalid
}, },
rxm: make(chan _RxM), opReady: make(chan struct{}),
rxm: make(chan _RxM),
} }
return node return node
...@@ -224,7 +220,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -224,7 +220,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
// update cluster state // update cluster state
// XXX locking // XXX locking
node.stateMu.Lock() node.opMu.Lock()
err = node.updateNodeTab(ctx, &mnt) err = node.updateNodeTab(ctx, &mnt)
node.state.PartTab = pt node.state.PartTab = pt
// XXX update "operational" // XXX update "operational"
...@@ -234,7 +230,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -234,7 +230,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
c.node.StateMu.Unlock() c.node.StateMu.Unlock()
opready() opready()
*/ */
node.stateMu.Unlock() node.opMu.Unlock()
if err != nil { // might be command to shutdown if err != nil { // might be command to shutdown
return err return err
} }
...@@ -320,12 +316,12 @@ func (node *_MasteredNode) RecvM1() (neonet.Request, error) { ...@@ -320,12 +316,12 @@ func (node *_MasteredNode) RecvM1() (neonet.Request, error) {
func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt bool, err error) { func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt bool, err error) {
δpt = false δpt = false
node.stateMu.Lock() node.opMu.Lock()
// XXX defer unlock ? // XXX defer unlock ?
switch msg := msg.(type) { switch msg := msg.(type) {
default: default:
node.stateMu.Unlock() node.opMu.Unlock()
panic(fmt.Sprintf("unexpected message: %T", msg)) panic(fmt.Sprintf("unexpected message: %T", msg))
// <- whole partTab // <- whole partTab
...@@ -353,7 +349,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt ...@@ -353,7 +349,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
// update .operational + notify those who was waiting for it // update .operational + notify those who was waiting for it
opready := node.updateOperational() opready := node.updateOperational()
node.stateMu.Unlock() node.opMu.Unlock()
opready() opready()
return δpt, err return δpt, err
...@@ -361,7 +357,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt ...@@ -361,7 +357,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
// updateOperational updates .operational from current state. // updateOperational updates .operational from current state.
// //
// Must be called with .stateMu lock held. // Must be called with .opMu lock held.
// //
// Returned sendReady func must be called by updateOperational caller after // Returned sendReady func must be called by updateOperational caller after
// .node.StateMu lock is released - it will close current .opReady this way // .node.StateMu lock is released - it will close current .opReady this way
...@@ -377,7 +373,7 @@ func (node *_MasteredNode) updateOperational() (sendReady func()) { ...@@ -377,7 +373,7 @@ func (node *_MasteredNode) updateOperational() (sendReady func()) {
if operational != node.operational { if operational != node.operational {
node.operational = operational node.operational = operational
if operational { if operational {
opready = node.opReady // don't close from under stateMu opready = node.opReady // don't close from under opMu
} else { } else {
node.opReady = make(chan struct{}) // remake for next operational waiters node.opReady = make(chan struct{}) // remake for next operational waiters
} }
...@@ -394,16 +390,16 @@ func (node *_MasteredNode) updateOperational() (sendReady func()) { ...@@ -394,16 +390,16 @@ func (node *_MasteredNode) updateOperational() (sendReady func()) {
// WithOperational runs f during when cluster state is/becomes operational. // WithOperational runs f during when cluster state is/becomes operational.
// The cluster state is guaranteed not to change during f run. // The cluster state is guaranteed not to change during f run.
func (node *_MasteredNode) WithOperational(ctx context.Context, f func(cs *xneo.ClusterState) error) error { func (node *_MasteredNode) WithOperational(ctx context.Context, f func(mlink *neonet.NodeLink, cs *xneo.ClusterState) error) error {
for { for {
node.stateMu.RLock() node.opMu.RLock()
if node.operational { if node.operational {
//fmt.Printf("withOperation -> ready\n"); //fmt.Printf("withOperation -> ready\n");
break break
} }
ready := node.opReady ready := node.opReady
node.stateMu.RUnlock() node.opMu.RUnlock()
//fmt.Printf("withOperational - waiting on %v\n", ready) //fmt.Printf("withOperational - waiting on %v\n", ready)
...@@ -416,16 +412,16 @@ func (node *_MasteredNode) WithOperational(ctx context.Context, f func(cs *xneo. ...@@ -416,16 +412,16 @@ func (node *_MasteredNode) WithOperational(ctx context.Context, f func(cs *xneo.
} }
} }
// node.operational=y and node.stateMu is rlocked // node.operational=y and node.opMu is rlocked
defer node.stateMu.RUnlock() defer node.opMu.RUnlock()
return f(&node.state) return f(node.mlink, &node.state)
} }
var cmdShutdown = errors.New("master told us to shutdown") var cmdShutdown = errors.New("master told us to shutdown")
// updateNodeTab applies updates to .nodeTab from message and logs changes appropriately. // updateNodeTab applies updates to .nodeTab from message and logs changes appropriately.
// the only possible error is cmdShutdown. // the only possible error is cmdShutdown.
// must be called under .stateMu. // must be called under .opMu.
func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) error { func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyNodeInformation) error {
// XXX msg.IdTime ? // XXX msg.IdTime ?
for _, nodeInfo := range msg.NodeList { for _, nodeInfo := range msg.NodeList {
...@@ -449,7 +445,7 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN ...@@ -449,7 +445,7 @@ func (node *_MasteredNode) updateNodeTab(ctx context.Context, msg *proto.NotifyN
} }
} }
// FIXME logging under lock ok? (if caller took e.g. .stateMu before applying updates) // FIXME logging under lock ok? (if caller took e.g. .opMu before applying updates)
log.Infof(ctx, "full nodetab:\n%s", node.state.NodeTab) log.Infof(ctx, "full nodetab:\n%s", node.state.NodeTab)
return nil return nil
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment