Commit b8e0b935 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 32a92075
......@@ -31,7 +31,7 @@ import (
// Client talks to NEO cluster and exposes access it via ZODB interfaces
type Client struct {
neo.NodeCommon
node neo.NodeCommon
storLink *neo.NodeLink // link to storage node
storConn *neo.Conn // XXX main connection to storage
......@@ -107,7 +107,7 @@ func NewClient(storLink *neo.NodeLink) (*Client, error) {
// XXX move -> Run?
// first identify ourselves to peer
accept, err := neo.IdentifyWith(neo.STORAGE, storLink, cli.myInfo, cli.clusterName)
accept, err := neo.IdentifyWith(neo.STORAGE, storLink, cli.node.MyInfo, cli.node.ClusterName)
if err != nil {
return nil, err
}
......
......@@ -28,6 +28,9 @@ package neo
//go:generate sh -c "go run ../xcommon/tracing/cmd/gotrace/{gotrace,util}.go ."
import (
"net"
"lab.nexedi.com/kirr/neo/go/xcommon/xnet"
"lab.nexedi.com/kirr/neo/go/zodb"
)
......@@ -44,7 +47,7 @@ const (
// NodeCommon is common data in all NEO nodes: Master, Storage & Client XXX text
// XXX naming -> Node ?
type NodeCommon struct {
MyInfo neo.NodeInfo // XXX -> only NodeUUID
MyInfo NodeInfo // XXX -> only NodeUUID
ClusterName string
Net xnet.Networker // network AP we are sending/receiving on
......@@ -68,7 +71,7 @@ func (n *NodeCommon) Listen() (net.Listener, error) {
// NOTE listen("tcp", ":1234") gives l.Addr 0.0.0.0:1234 and
// listen("tcp6", ":1234") gives l.Addr [::]:1234
// -> host is never empty
addr, err := neo.Addr(l.Addr())
addr, err := Addr(l.Addr())
if err != nil {
// XXX -> panic here ?
l.Close()
......
......@@ -86,10 +86,12 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
}
m := &Master{
myInfo: neo.NodeInfo{NodeType: neo.MASTER, Address: addr},
clusterName: clusterName,
net: net,
masterAddr: serveAddr, // XXX ok?
node: neo.NodeCommon{
MyInfo: neo.NodeInfo{NodeType: neo.MASTER, Address: addr},
ClusterName: clusterName,
Net: net,
MasterAddr: serveAddr, // XXX ok?
},
ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}),
......@@ -99,7 +101,7 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
nodeLeave: make(chan nodeLeave),
}
m.myInfo.NodeUUID = m.allocUUID(neo.MASTER)
m.node.MyInfo.NodeUUID = m.allocUUID(neo.MASTER)
// TODO update nodeTab with self
m.clusterState = neo.ClusterRecovering // XXX no elections - we are the only master
......@@ -109,12 +111,12 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
// Run starts master node and runs it until ctx is cancelled or fatal error
func (m *Master) Run(ctx context.Context) error {
// start listening
l, err := m.Listen()
l, err := m.node.Listen()
if err != nil {
return err // XXX err ctx
}
m.masterAddr = l.Addr().String()
m.node.MasterAddr = l.Addr().String()
// serve incoming connections
wg := sync.WaitGroup{}
......@@ -152,7 +154,7 @@ func (m *Master) Start() error {
func (m *Master) Stop() {
ech := make(chan struct{})
m.ctlStop <- ech
return <-ech
<-ech
}
// Shutdown requests all known nodes in the cluster to stop
......@@ -636,7 +638,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
// - NodeType valid
// - IdTimestamp ?
if n.idReq.ClusterName != m.clusterName {
if n.idReq.ClusterName != m.node.ClusterName {
n.idResp <- &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"} // XXX
return nil, false
}
......@@ -668,7 +670,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
n.idResp <- &neo.AcceptIdentification{
NodeType: neo.MASTER,
MyNodeUUID: m.myInfo.NodeUUID,
MyNodeUUID: m.node.MyInfo.NodeUUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid,
......
......@@ -36,7 +36,7 @@ import (
// Storage is NEO node that keeps data and provides read/write access to it
type Storage struct {
neo.NodeCommon
node neo.NodeCommon
// context for providing operational service
// it is renewed every time master tells us StartOpertion, so users
......@@ -65,10 +65,13 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor
}
stor := &Storage{
myInfo: neo.NodeInfo{NodeType: neo.STORAGE, Address: addr},
clusterName: cluster,
net: net,
masterAddr: masterAddr,
node: neo.NodeCommon{
MyInfo: neo.NodeInfo{NodeType: neo.STORAGE, Address: addr},
ClusterName: cluster,
Net: net,
MasterAddr: masterAddr,
},
zstor: zstor,
}
......@@ -121,9 +124,9 @@ func (stor *Storage) talkMaster(ctx context.Context) error {
// XXX errctx
for {
fmt.Printf("stor: master(%v): connecting\n", stor.masterAddr) // XXX info
fmt.Printf("stor: master(%v): connecting\n", stor.node.MasterAddr) // XXX info
err := stor.talkMaster1(ctx)
fmt.Printf("stor: master(%v): %v\n", stor.masterAddr, err)
fmt.Printf("stor: master(%v): %v\n", stor.node.MasterAddr, err)
// TODO if err = shutdown -> return
......@@ -143,7 +146,7 @@ func (stor *Storage) talkMaster(ctx context.Context) error {
// it returns error describing why such cycle had to finish
// XXX distinguish between temporary problems and non-temporary ones?
func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
Mlink, err := neo.Dial(ctx, stor.net, stor.masterAddr)
Mlink, err := neo.Dial(ctx, stor.node.Net, stor.node.MasterAddr)
if err != nil {
return err
}
......@@ -155,7 +158,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
}()
// request identification this way registering our node to master
accept, err := neo.IdentifyWith(neo.MASTER, Mlink, stor.myInfo, stor.clusterName)
accept, err := neo.IdentifyWith(neo.MASTER, Mlink, stor.node.MyInfo, stor.node.ClusterName)
if err != nil {
return err
}
......@@ -166,9 +169,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
return fmt.Errorf("TODO for 1-storage POC: Npt: %v Nreplica: %v", accept.NumPartitions, accept.NumReplicas)
}
if accept.YourNodeUUID != stor.myInfo.NodeUUID {
if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID {
fmt.Printf("stor: %v: master told us to have UUID=%v\n", Mlink, accept.YourNodeUUID)
stor.myInfo.NodeUUID = accept.YourNodeUUID
stor.node.MyInfo.NodeUUID = accept.YourNodeUUID
}
// now handle notifications and commands from master
......@@ -181,7 +184,7 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
default:
}
if err.IsShutdown(...) { // TODO
if err != nil /* TODO .IsShutdown(...) */ { // TODO
return err
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment