Commit 1d3db8a3 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5a6ce3e2
......@@ -35,6 +35,7 @@ import (
"lab.nexedi.com/kirr/go123/mem"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/go123/xnet"
"lab.nexedi.com/kirr/go123/xsync"
// "lab.nexedi.com/kirr/neo/go/internal/log"
"lab.nexedi.com/kirr/neo/go/internal/task"
......@@ -54,7 +55,8 @@ type Client struct {
// node *xneo.NodeApp
node *_MasteredNode
talkMasterCancel func()
runWG *xsync.WorkGroup
runCancel func()
// link to master - established and maintained by talkMaster.
// users retrieve it via masterLink().
......@@ -116,21 +118,26 @@ func (cli *Client) Run(ctx context.Context) (err error) {
// run process which performs master talk
ctx, cancel := context.WithCancel(ctx)
cli.talkMasterCancel = cancel
cli.runCancel = cancel
// cli.node.OnShutdown = cancel // XXX ok?
// return cli.talkMaster(ctx)
return cli.node.talkMaster(ctx)
cli.runWG.Go(cli.node.talkMaster)
cli.runWG.Go(cli.recvMaster)
return cli.runWG.Wait()
}
// Close implements zodb.IStorageDriver.
func (c *Client) Close() (err error) {
c.talkMasterCancel()
// XXX wait talkMaster finishes -> XXX return err from that?
// XXX what else?
c.runCancel()
err = c.runWG.Wait()
// close networker if configured to do so
if c.ownNet {
err = c.node.Net.Close()
err2 := c.node.Net.Close()
if err == nil {
err = err2
}
}
return err
}
......@@ -236,37 +243,6 @@ func (c *Client) withOperational(ctx context.Context) error {
*/
/*
// talkMaster connects to master, announces self and receives notifications.
// it tries to persist master link reconnecting as needed.
//
// TODO C -> M for commit (-> another channel)
//
// XXX always error (dup Storage.talkMaster) ?
func (c *Client) talkMaster(ctx context.Context) (err error) {
defer task.Runningf(&ctx, "client: talk master(%v)", c.node.MasterAddr)(&err)
// XXX dup wrt Storage.talkMaster
for {
// XXX .nodeTab.Reset() ?
err := c.talkMaster1(ctx)
log.Warning(ctx, err) // XXX Warning ok? -> Error?
// TODO if err == "reject identification / protocol error" -> shutdown client
// TODO if err = shutdown -> return
// exit on cancel / throttle reconnecting
select {
case <-ctx.Done():
return ctx.Err()
// XXX 1s hardcoded -> move out of here
case <-time.After(1*time.Second):
// ok
}
}
}
func (c *Client) talkMaster1(ctx context.Context) (err error) {
mlink, accept, err := c.node.Dial(ctx, proto.MASTER, c.node.MasterAddr)
if err != nil {
......@@ -385,67 +361,31 @@ func (c *Client) initFromMaster(ctx context.Context, mlink *neonet.NodeLink) (er
// recvMaster receives and handles notifications from master.
func (c *Client) recvMaster(ctx context.Context, mlink *neonet.NodeLink) (err error) {
defer task.Running(&ctx, "rx")(&err)
func (c *Client) recvMaster(ctx context.Context) (err error) {
defer task.Running(&ctx, "rx")(&err) // XXX recheck vs talkMaster
for {
req, err := mlink.Recv1() // XXX -> Recv1M
req, err := c.node.RecvM1()
if err != nil {
return err
return err // XXX eventReconnect
}
err = c.recvMaster1(ctx, req)
err = c.recvMaster1(req.Msg)
req.Close()
if err != nil {
return err
}
}
}
// recvMaster1 handles 1 message from master.
func (c *Client) recvMaster1(ctx context.Context, req neonet.Request) error {
switch msg := req.Msg.(type) {
func (c *Client) recvMaster1(msg proto.Msg) error {
switch msg := msg.(type) {
// <- committed txn
case *proto.InvalidateObjects:
return c.invalidateObjects(msg)
default:
return fmt.Errorf("unexpected message: %T", msg)
}
/*
// messages for state changes
// XXX -> NodeApp into common code to handle NodeTab + PartTab updates from M?
c.node.StateMu.Lock()
switch msg := req.Msg.(type) {
default:
c.node.statemu.unlock()
return fmt.Errorf("unexpected message: %T", msg)
// <- whole partTab
case *proto.SendPartitionTable:
c.node.UpdatePartTab(ctx, msg)
// <- δ(partTab)
case *proto.NotifyPartitionChanges:
panic("TODO δ(partTab)")
// <- δ(nodeTab)
case *proto.NotifyNodeInformation:
c.node.UpdateNodeTab(ctx, msg)
case *proto.NotifyClusterState:
c.node.UpdateClusterState(ctx, msg)
}
// update .operational + notify those who was waiting for it
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
return nil
*/
}
// invalidateObjects is called by recvMaster1 on receiving invalidateObjects notification.
......
......@@ -53,20 +53,16 @@ type _MasteredNode struct {
Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry
// nodeTab/partTab/clusterState
stateMu sync.RWMutex
state xneo.ClusterState
// nodeTab *xneo.NodeTable // information about nodes in the cluster
// partTab *xneo.PartitionTable // information about data distribution in the cluster
// clusterState proto.ClusterState // master idea about cluster state
// operational state in node is maintained by talkMaster.
// users retrieve it via withOperational(). XXX recheck
//
// NOTE being operational means:
// - link to master established and is ok
// - .partTab is operational wrt .nodeTab
// - .clusterState = RUNNING <- XXX needed?
// - .state is operational
//
// however master link is accessed separately (see ^^^ and masterLink)
//
......@@ -75,19 +71,40 @@ type _MasteredNode struct {
opReady chan struct{} // reinitialized each time state becomes non-operational
flags _MasteredNodeFlags
rxm chan _RxM // talkMaster -> RecvM1
/*
// TODO -> RecvM1 instead
// OnNotify, if !nil, is called when master notifies this node with a message.
// XXX not called for δstate
OnNotify func(msg proto.Msg) error
OnNotify func(msg proto.Msg) error // XXX kill
// OnNotifyδPartTab, if !nil, is called when master notifies this node
// with a change to partition table. (used by S to persist partTab)
OnNotifyδPartTab func(pt *xneo.PartitionTable) error
OnNotifyδPartTab func(pt *xneo.PartitionTable) error // XXX kill
*/
}
// _RxM represents a request or event received from master.
type _RxM struct {
Req neonet.Request
Err error // event*
}
type _MasteredNodeFlags int
const (
// δPartTabPassThrough tells RecvM1 not to filter out messages related
// to partition table changes. When RecvM1 receives such messages there
// are already processed internally to update .state.PartTab correspondingly.
//
// Storage uses this mode to receive δPartTab notifications to know
// when to persist it.
δPartTabPassThrough _MasteredNodeFlags = iota
)
// XXX doc
func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *_MasteredNode {
node := &_MasteredNode{
myInfo: proto.NodeInfo{
......@@ -106,6 +123,8 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
PartTab: &xneo.PartitionTable{},
Code: -1, // invalid
},
rxm: make(chan _RxM),
}
return node
......@@ -188,12 +207,19 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) error {
err = node.updateNodeTab(ctx, &mnt)
node.state.PartTab = pt
// XXX update "operational"
/*
// update .operational + notify those who was waiting for it
opready := c.updateOperational()
c.node.StateMu.Unlock()
opready()
*/
node.stateMu.Unlock()
if err != nil { // might be command to shutdown
return err
}
// XXX update .masterLink + notify waiters
// XXX rxm <- eventReconnect
// receive and handle notifications from master
defer task.Running(&ctx, "rx")(&err)
......@@ -202,8 +228,7 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) error {
if err != nil {
return err
}
err = node.recvMaster1(ctx, req.Msg)
req.Close()
err = node.recvMaster1(ctx, req) // req ownership is passed in
if err != nil {
return err
}
......@@ -213,10 +238,10 @@ func (node *_MasteredNode) talkMaster1(ctx context.Context) error {
}
// recvMaster1 handles 1 message from master.
func (node *_MasteredNode) recvMaster1(ctx context.Context, msg proto.Msg) (err error) {
func (node *_MasteredNode) recvMaster1(ctx context.Context, req neonet.Request) (err error) {
// messages for state changes are handled internally
δstate := true
switch msg.(type) {
switch req.Msg.(type) {
default: δstate = false
case *proto.SendPartitionTable: // whole partTab
case *proto.NotifyPartitionChanges: // δ(partTab)
......@@ -225,25 +250,46 @@ func (node *_MasteredNode) recvMaster1(ctx context.Context, msg proto.Msg) (err
}
if δstate {
err = node.recvδstate(ctx, msg)
} else {
// XXX other messages? -> particular user
// XXX rework protocol so that M sends δstate on dedicated connection and other messages on other connections?
if node.OnNotify != nil {
err = node.OnNotify(msg)
} else {
err = fmt.Errorf("unexpected message: %T", msg)
δpt, err := node.recvδstate(ctx, req.Msg)
toRecvM1 := false
if δpt && (node.flags & δPartTabPassThrough != 0) {
toRecvM1 = true
}
if !toRecvM1 {
req.Close()
return err
}
}
// pass request -> RecvM1
// NOTE req ownership is passed into RecvM1 caller who becomes responsible to close it
select {
case <-ctx.Done():
req.Close()
return ctx.Err()
case node.rxm <- _RxM{Req: req}:
// ok
}
return err
return nil
}
// RecvM1 receives request from master filtered through δstate handler.
//
// XXX eventReconnect
// XXX link down ?
func (node *_MasteredNode) RecvM1() (neonet.Request, error) {
rx := <-node.rxm
// XXX close -> EOF?
return rx.Req, rx.Err
}
//trace:event traceClusterStateChanged(cs *proto.ClusterState)
// recvδstate handles reception of δstate messages.
func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err error) {
δpt := false
func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt bool, err error) {
δpt = false
node.stateMu.Lock()
// XXX defer unlock ?
......@@ -255,6 +301,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err
// <- whole partTab
case *proto.SendPartitionTable:
δpt = true
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
// XXX logging under lock ok?
log.Infof(ctx, "parttab update: %s", pt)
......@@ -262,11 +309,12 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err
// <- δ(partTab)
case *proto.NotifyPartitionChanges:
δpt = true
panic("TODO δ(partTab)")
// <- δ(nodeTab)
case *proto.NotifyNodeInformation:
node.updateNodeTab(ctx, msg)
err = node.updateNodeTab(ctx, msg) // XXX recheck return (might be command to shutdown)
case *proto.NotifyClusterState:
log.Infof(ctx, "state update: %s", msg.State)
......@@ -274,18 +322,20 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (err
traceClusterStateChanged(&node.state.Code)
}
/* XXX kill
if δpt && node.OnNotifyδPartTab != nil {
err = node.OnNotifyδPartTab(node.state.PartTab)
// XXX err -> return without notify?
panic("TODO")
}
*/
// update .operational + notify those who was waiting for it
opready := node.updateOperational()
node.stateMu.Unlock()
opready()
return nil
return δpt, err
}
// updateOperational updates .operational from current state.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment