Commit 791e14e9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 9f6f2e91
...@@ -52,36 +52,11 @@ import ( ...@@ -52,36 +52,11 @@ import (
// Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces. // Client is NEO node that talks to NEO cluster and exposes access to it via ZODB interfaces.
type Client struct { type Client struct {
// node *xneo.NodeApp
node *_MasteredNode node *_MasteredNode
// runWG *xsync.WorkGroup // runWG *xsync.WorkGroup
runCancel func() runCancel func()
/*
// link to master - established and maintained by talkMaster.
// users retrieve it via masterLink().
mlinkMu sync.Mutex
mlink *neonet.NodeLink
mlinkReady chan struct{} // reinitialized at each new talk cycle
*/
/*
// operational state in node is maintained by recvMaster.
// users retrieve it via withOperational().
//
// NOTE being operational means:
// - link to master established and is ok
// - .PartTab is operational wrt .NodeTab
// - .ClusterState = RUNNING <- XXX needed?
//
// however master link is accessed separately (see ^^^ and masterLink)
//
// protected by .node.StateMu
operational bool // XXX <- somehow move to NodeApp?
opReady chan struct{} // reinitialized each time state becomes non-operational
*/
// driver client <- watcher: database commits | errors. // driver client <- watcher: database commits | errors.
watchq chan<- zodb.Event watchq chan<- zodb.Event
head zodb.Tid // last invalidation received from server head zodb.Tid // last invalidation received from server
...@@ -104,11 +79,7 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -104,11 +79,7 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client {
return &Client{ return &Client{
// node: xneo.NewNodeApp(net, proto.CLIENT, clusterName, masterAddr),
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr),
// mlinkReady: make(chan struct{}),
// operational: false,
// opReady: make(chan struct{}),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
} }
} }
...@@ -165,187 +136,6 @@ func (c *Client) Close() (err error) { ...@@ -165,187 +136,6 @@ func (c *Client) Close() (err error) {
return err return err
} }
// --- connection with master ---
/*
// masterLink returns link to primary master.
//
// NOTE that even if masterLink returns != nil, the master link can become
// non-operational at any later time. (such cases will be reported as
// ErrLinkDown returned by all mlink operations)
func (c *Client) masterLink(ctx context.Context) (*neonet.NodeLink, error) {
for {
c.mlinkMu.Lock()
mlink := c.mlink
ready := c.mlinkReady
c.mlinkMu.Unlock()
if mlink != nil {
return mlink, nil
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-ready:
// ok - try to relock mlinkMu and read mlink again.
}
}
}
*/
/*
// updateOperational updates .operational from current state.
//
// Must be called with .node.StateMu lock held.
//
// Returned sendReady func must be called by updateOperational caller after
// .node.StateMu lock is released - it will close current .opReady this way
// notifying .operational waiters.
//
// XXX move somehow -> NodeApp?
func (c *Client) updateOperational() (sendReady func()) {
// XXX py client does not wait for cluster state = running
operational := // c.node.ClusterState == ClusterRunning &&
c.node.PartTab.OperationalWith(c.node.NodeTab)
//fmt.Printf("\nupdateOperatinal: %v\n", operational)
//fmt.Println(c.node.PartTab)
//fmt.Println(c.node.NodeTab)
var opready chan struct{}
if operational != c.operational {
c.operational = operational
if operational {
opready = c.opReady // don't close from under StateMu
} else {
c.opReady = make(chan struct{}) // remake for next operational waiters
}
}
return func() {
if opready != nil {
//fmt.Println("updateOperational - notifying %v\n", opready)
close(opready)
}
}
}
*/
/*
// withOperational waits for cluster state to be operational.
//
// If successful it returns with operational state RLocked (c.node.StateMu) and
// unlocked otherwise.
//
// The only error possible is if provided ctx cancels.
// XXX and client stopped/closed? (ctx passed to Run cancelled)
//
// XXX change signature to call f from under withOperational ?
func (c *Client) withOperational(ctx context.Context) error {
for {
c.node.StateMu.RLock()
if c.operational {
//fmt.Printf("withOperation -> ready\n");
return nil
}
ready := c.opReady
c.node.StateMu.RUnlock()
//fmt.Printf("withOperational - waiting on %v\n", ready)
select {
case <-ctx.Done():
return ctx.Err()
case <-ready:
// ok - try to relock and read again.
}
}
}
*/
/*
func (c *Client) talkMaster1(ctx context.Context) (err error) {
mlink, accept, err := c.node.Dial(ctx, proto.MASTER, c.node.MasterAddr)
if err != nil {
// FIXME it is not only identification - e.g. ECONNREFUSED
return err
}
// FIXME vvv dup from Storage.talkMaster1
// XXX -> node.Dial / node.DialMaster ?
if accept.YourNID != c.node.MyInfo.NID {
log.Infof(ctx, "master told us to have nid=%v", accept.YourNID)
c.node.MyInfo.NID = accept.YourNID
}
wg, ctx := errgroup.WithContext(ctx) // XXX -> xsync.WorkGroup
defer xio.CloseWhenDone(ctx, mlink)()
// master pushes nodeTab and partTab to us right after identification
// XXX merge into -> node.DialMaster?
// nodeTab
mnt := proto.NotifyNodeInformation{}
_, err = mlink.Expect1(&mnt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
}
// partTab
mpt := proto.SendPartitionTable{}
_, err = mlink.Expect1(&mpt)
if err != nil {
return fmt.Errorf("after identification: %w", err)
}
pt := xneo.PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
log.Infof(ctx, "master initialized us with next parttab:\n%s", pt)
c.node.StateMu.Lock()
c.node.UpdateNodeTab(ctx, &mnt)
c.node.PartTab = pt
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
// set c.mlink and notify waiters
c.mlinkMu.Lock()
c.mlink = mlink
ready := c.mlinkReady
c.mlinkReady = make(chan struct{})
c.mlinkMu.Unlock()
close(ready)
// when we are done - reset .mlink
defer func() {
c.mlinkMu.Lock()
c.mlink = nil
c.mlinkMu.Unlock()
}()
c.head0 = c.head
// launch master notifications receiver
wg.Go(func() error {
return c.recvMaster(ctx, mlink)
})
// init lastTid from master
// TODO better change protocol for master to send us head via notify
// channel right after identification.
wg.Go(func() error {
return c.initFromMaster(ctx, mlink)
})
return wg.Wait()
}
*/
// syncMaster asks M for DB head right after identification. // syncMaster asks M for DB head right after identification.
func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) { func (c *Client) syncMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "sync0")(&err) // XXX unify with Sync ? defer task.Running(&ctx, "sync0")(&err) // XXX unify with Sync ?
......
...@@ -65,7 +65,7 @@ type _MasteredNode struct { ...@@ -65,7 +65,7 @@ type _MasteredNode struct {
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry MasterAddr string // address of current master TODO -> masterRegistry
// operational state in node is maintained by TalkMaster. // operational state is maintained by TalkMaster.
// users retrieve it via WithOperational(). // users retrieve it via WithOperational().
// //
// being operational means: // being operational means:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment