Commit 655c4cc3 authored by Levin Zimmermann's avatar Levin Zimmermann

Node: Add support for NEO cluster with > 1 master

Some NEO clusters have more than one master to gain a higher availability.
Before this patch NEO/go Node type only handled one master address. This
commit adjusts the node type and related bits so that it can support more than
one master node.
parent 7a0674c2
...@@ -80,9 +80,9 @@ var _ zodb.IStorageDriver = (*Client)(nil) ...@@ -80,9 +80,9 @@ var _ zodb.IStorageDriver = (*Client)(nil)
// //
// It will connect to master @masterAddr and identify with specified cluster name. // It will connect to master @masterAddr and identify with specified cluster name.
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewClient(clusterName, masterAddr string, net xnet.Networker) *Client { func NewClient(clusterName, masterAddrSlice []string, net xnet.Networker) *Client {
c := &Client{ c := &Client{
node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddr), node: newMasteredNode(proto.CLIENT, clusterName, net, masterAddrSlice),
at0Ready: make(chan struct{}), at0Ready: make(chan struct{}),
closed: make(chan struct{}), closed: make(chan struct{}),
} }
...@@ -483,7 +483,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) ( ...@@ -483,7 +483,7 @@ func openClientByURL(ctx context.Context, u *url.URL, opt *zodb.DriverOptions) (
return nil, zodb.InvalidTid, err return nil, zodb.InvalidTid, err
} }
c := NewClient(name, u.Host, net) c := NewClient(name, []string{u.Host}, net)
c.ownNet = true c.ownNet = true
c.watchq = opt.Watchq c.watchq = opt.Watchq
defer func() { defer func() {
......
...@@ -290,7 +290,7 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) { ...@@ -290,7 +290,7 @@ func StartNEOGoSrv(opt NEOSrvOptions) (_ *NEOGoSrv, err error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
n.S = NewStorage(opt.name, n.masterAddr(), net, n.Sback) n.S = NewStorage(opt.name, []string{n.masterAddr()}, net, n.Sback)
serveWG.Go(func(ctx context.Context) error { serveWG.Go(func(ctx context.Context) error {
return n.S.Run(ctx, n.Sl) return n.S.Run(ctx, n.Sl)
}) })
......
...@@ -160,7 +160,7 @@ func (_ *_ΔStateCode) δClusterState() {} ...@@ -160,7 +160,7 @@ func (_ *_ΔStateCode) δClusterState() {}
// Use Run to actually start running the node. // Use Run to actually start running the node.
func NewMaster(clusterName string, net xnet.Networker) *Master { func NewMaster(clusterName string, net xnet.Networker) *Master {
return &Master{ return &Master{
node: xneo.NewNode(proto.MASTER, clusterName, net, ""), node: xneo.NewNode(proto.MASTER, clusterName, net, []string{}),
ctlStart: make(chan chan error), ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}), ctlStop: make(chan chan struct{}),
...@@ -219,7 +219,9 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) { ...@@ -219,7 +219,9 @@ func (m *Master) Run(ctx context.Context, l xnet.Listener) (err error) {
if err != nil { if err != nil {
return err return err
} }
m.node.MasterAddr = addr.String() // NOTE: How can this master node know the address of the
// other master nodes?
m.node.MasterAddrSlice = []string{addr.String()}
m.node.MyInfo = proto.NodeInfo{ m.node.MyInfo = proto.NodeInfo{
Type: proto.MASTER, Type: proto.MASTER,
Addr: naddr, Addr: naddr,
......
...@@ -85,9 +85,9 @@ const ( ...@@ -85,9 +85,9 @@ const (
) )
// newMasteredNode creates new _MasteredNode that connects to masterAddr/cluster via net. // newMasteredNode creates new _MasteredNode that connects to masterAddr/cluster via net.
func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *_MasteredNode { func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddrSlice []string) *_MasteredNode {
node := &_MasteredNode{ node := &_MasteredNode{
Node: xneo.NewNode(typ, clusterName, net, masterAddr), Node: xneo.NewNode(typ, clusterName, net, masterAddrSlice),
opReady: make(chan struct{}), opReady: make(chan struct{}),
} }
...@@ -108,13 +108,24 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker, ...@@ -108,13 +108,24 @@ func newMasteredNode(typ proto.NodeType, clusterName string, net xnet.Networker,
func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) { func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Context, *_MasterLink) error) (err error) {
// start logging with initial NID (that might be temporary, and which master can tell us to change) // start logging with initial NID (that might be temporary, and which master can tell us to change)
ctx0 := ctx ctx0 := ctx
// When a node is created with "NewNode", we don't know yet, which of the
// the provided master nodes is the primary master. We'll figure this our here
// and simply start with the first node.
// We don't assign 'MasterAddr' in 'NewNode', because master nodes are also
// created with the 'NewNode' method and master nodes don't know their Addr yet
// during initialization time: it's only assigned later when 'Run' is called.
if (node.MasterAddr == "") {
node.MasterAddr = node.MasterAddrSlice[0]
}
defer task.Runningf(&ctx, "%s: talk master(%s)", node.MyInfo.NID, node.MasterAddr)(&err) defer task.Runningf(&ctx, "%s: talk master(%s)", node.MyInfo.NID, node.MasterAddr)(&err)
for { for {
node.updateOperational(func() { node.updateOperational(func() {
node.mlink = nil node.mlink = nil
}) })
err := node.talkMaster1(ctx, ctx0, f) err := node.talkMaster1(ctx, ctx0, f, node.MasterAddr)
log.Warning(ctx, err) // XXX Warning -> Error? log.Warning(ctx, err) // XXX Warning -> Error?
if errors.Is(err, cmdShutdown) { if errors.Is(err, cmdShutdown) {
return err // M commands to shutdown return err // M commands to shutdown
...@@ -134,7 +145,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex ...@@ -134,7 +145,7 @@ func (node *_MasteredNode) TalkMaster(ctx context.Context, f func(context.Contex
} }
} }
func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error) error { func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(context.Context, *_MasterLink) error, masterAddr string) error {
reqID := &proto.RequestIdentification{ reqID := &proto.RequestIdentification{
NodeType: node.MyInfo.Type, NodeType: node.MyInfo.Type,
NID: node.MyInfo.NID, NID: node.MyInfo.NID,
...@@ -144,7 +155,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func( ...@@ -144,7 +155,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
DevPath: nil, // XXX stub DevPath: nil, // XXX stub
NewNID: nil, // XXX stub NewNID: nil, // XXX stub
} }
mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, node.MasterAddr, reqID) mlink, accept, err := xneo.Dial(ctx, proto.MASTER, node.Net, masterAddr, reqID)
if err != nil { if err != nil {
return err return err
} }
......
...@@ -319,7 +319,7 @@ func (t *tCluster) Storage(name string) ITestStorage { ...@@ -319,7 +319,7 @@ func (t *tCluster) Storage(name string) ITestStorage {
// {New,}Client are similar to {New,}Master but for client nodes. // {New,}Client are similar to {New,}Master but for client nodes.
func (t *tCluster) NewClient(name, masterAddr string) ITestClient { func (t *tCluster) NewClient(name, masterAddr string) ITestClient {
tnode := t.registerNewNode(name) tnode := t.registerNewNode(name)
c := NewClient(t.name, masterAddr, tnode.net) c := NewClient(t.name, []string{masterAddr}, tnode.net)
t.gotracer.RegisterNode(c.node.Node, name) t.gotracer.RegisterNode(c.node.Node, name)
t.runWG.Go(func(ctx context.Context) error { t.runWG.Go(func(ctx context.Context) error {
return c.Run(ctx) return c.Run(ctx)
...@@ -346,7 +346,7 @@ type tStorage struct { ...@@ -346,7 +346,7 @@ type tStorage struct {
func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *tStorage { func tNewStorage(clusterName, masterAddr, serveAddr string, net xnet.Networker, back storage.Backend) *tStorage {
return &tStorage{ return &tStorage{
Storage: NewStorage(clusterName, masterAddr, net, back), Storage: NewStorage(clusterName, []string{masterAddr}, net, back),
serveAddr: serveAddr, serveAddr: serveAddr,
} }
} }
......
...@@ -80,6 +80,7 @@ type Node struct { ...@@ -80,6 +80,7 @@ type Node struct {
ClusterName string ClusterName string
Net xnet.Networker // network AP we are sending/receiving on Net xnet.Networker // network AP we are sending/receiving on
MasterAddr string // address of current master TODO -> masterRegistry MasterAddr string // address of current master TODO -> masterRegistry
MasterAddrSlice []string // address of all known masters
// XXX reconsider not using State and have just .NodeTab, .PartTab, .ClusterState // XXX reconsider not using State and have just .NodeTab, .PartTab, .ClusterState
// StateMu sync.RWMutex // <- XXX unexport ? XXX not used -> move to MasteredNode ? // StateMu sync.RWMutex // <- XXX unexport ? XXX not used -> move to MasteredNode ?
...@@ -90,7 +91,7 @@ type Node struct { ...@@ -90,7 +91,7 @@ type Node struct {
} }
// NewNode creates new node. // NewNode creates new node.
func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddr string) *Node { func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterAddrSlice []string) *Node {
node := &Node{ node := &Node{
MyInfo: proto.NodeInfo{ MyInfo: proto.NodeInfo{
Type: typ, Type: typ,
...@@ -101,7 +102,7 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA ...@@ -101,7 +102,7 @@ func NewNode(typ proto.NodeType, clusterName string, net xnet.Networker, masterA
ClusterName: clusterName, ClusterName: clusterName,
Net: net, Net: net,
MasterAddr: masterAddr, MasterAddrSlice: masterAddrSlice,
State: ClusterState{ State: ClusterState{
NodeTab: &NodeTable{}, NodeTab: &NodeTable{},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment