Commit 1cc74a85 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 4755152d
...@@ -31,6 +31,11 @@ import ( ...@@ -31,6 +31,11 @@ import (
type Master struct { type Master struct {
clusterName string clusterName string
clusterState ClusterState clusterState ClusterState
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
nodeTab NodeTable
partTab PartitionTable
} }
func NewMaster(clusterName string) *Master { func NewMaster(clusterName string) *Master {
...@@ -52,7 +57,42 @@ func (m *Master) SetClusterState(state ClusterState) { ...@@ -52,7 +57,42 @@ func (m *Master) SetClusterState(state ClusterState) {
func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
fmt.Printf("master: %s: serving new node\n", link) fmt.Printf("master: %s: serving new node\n", link)
// TODO // close link when either cancelling or returning (e.g. due to an error)
// ( when cancelling - link.Close will signal to all current IO to
// terminate with an error )
// XXX dup -> utility
retch := make(chan struct{})
defer func() { close(retch) }()
go func() {
select {
case <-ctx.Done():
// XXX tell peers we are shutting down?
// XXX ret err = ctx.Err()
case <-retch:
}
fmt.Printf("master: %v: closing link\n", link)
link.Close() // XXX err
}()
// identify
nodeInfo, err := IdentifyPeer(link, MASTER)
if err != nil {
fmt.Printf("master: %v\n", err)
return
}
// add info to nodeTab
m.nodeTab.Lock()
m.nodeTab.Add(&Node{nodeInfo, link})
m.nodeTab.Unlock()
// TODO subscribe to nodeTab and broadcast updates
// identification passed, now serve other requests
// client: notify + serve requests
// storage: notify + ?
} }
// ServeClient serves incoming connection on which peer identified itself as client // ServeClient serves incoming connection on which peer identified itself as client
......
...@@ -22,15 +22,6 @@ import ( ...@@ -22,15 +22,6 @@ import (
"sync" "sync"
) )
// Node represents a node from local-node point of view
type Node struct {
Info NodeInfo
Link *NodeLink // link to this node; =nil if not connected
// XXX identified or not ?
}
// NodeTable represents all nodes in a cluster // NodeTable represents all nodes in a cluster
// //
// Usually Master maintains such table and provides it to other nodes to know // Usually Master maintains such table and provides it to other nodes to know
...@@ -70,9 +61,23 @@ type Node struct { ...@@ -70,9 +61,23 @@ type Node struct {
// //
type NodeTable struct { type NodeTable struct {
// users have to care locking explicitly // users have to care locking explicitly
sync.Mutex // XXX -> RWMutex ? sync.RWMutex
nodev []Node
ver int // ↑ for versioning XXX do we need this?
} }
// Node represents a node entry in NodeTable
type Node struct {
Info NodeInfo // XXX extract ?
Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ?
// XXX identified or not ?
}
// UpdateNode updates information about a node // UpdateNode updates information about a node
func (nt *NodeTable) UpdateNode(nodeInfo NodeInfo) { func (nt *NodeTable) UpdateNode(nodeInfo NodeInfo) {
// TODO // TODO
...@@ -80,7 +85,25 @@ func (nt *NodeTable) UpdateNode(nodeInfo NodeInfo) { ...@@ -80,7 +85,25 @@ func (nt *NodeTable) UpdateNode(nodeInfo NodeInfo) {
// XXX ? ^^^ UpdateNode is enough ? // XXX ? ^^^ UpdateNode is enough ?
func (nt *NodeTable) Add(node *Node) { func (nt *NodeTable) Add(node *Node) {
// TODO // XXX check node is already there
// XXX pass/store node by pointer ?
nt.nodev = append(nt.nodev, *node)
} }
// TODO subscribe for changes on Add ? (notification via channel) // TODO subscribe for changes on Add ? (notification via channel)
func (nt *NodeTable) String() string {
//nt.RLock() // FIXME -> it must be client
//defer nt.RUnlock()
buf := bytes.Buffer{}
for node := range nl.nodev {
// XXX recheck output
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", node.NodeID, node.NodeType, node.NodeState, node.Address)
}
return buf.String()
}
...@@ -103,9 +103,11 @@ package neo ...@@ -103,9 +103,11 @@ package neo
// storages to executed them, and broadcasts partition table updates to all // storages to executed them, and broadcasts partition table updates to all
// nodes in the cluster. // nodes in the cluster.
type PartitionTable struct { type PartitionTable struct {
// XXX do we need sync.Mutex here for updates ?
ptTab []PartitionCell // [#Np] ptTab []PartitionCell // [#Np]
ptId // ↑ for versioning ptId int // ↑ for versioning XXX -> ver ?
} }
// PartitionCell describes one storage in a ptid entry in partition table // PartitionCell describes one storage in a ptid entry in partition table
......
...@@ -43,6 +43,7 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error { ...@@ -43,6 +43,7 @@ func Serve(ctx context.Context, l net.Listener, srv Server) error {
// close listener when either cancelling or returning (e.g. due to an error) // close listener when either cancelling or returning (e.g. due to an error)
// ( when cancelling - listener close will signal to all accepts to // ( when cancelling - listener close will signal to all accepts to
// terminate with an error ) // terminate with an error )
// XXX dup -> utility
retch := make(chan struct{}) retch := make(chan struct{})
defer func() { close(retch) }() defer func() { close(retch) }()
go func() { go func() {
...@@ -107,14 +108,6 @@ func errcontextf(errp *error, format string, argv ...interface{}) { ...@@ -107,14 +108,6 @@ func errcontextf(errp *error, format string, argv ...interface{}) {
func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) { func IdentifyPeer(link *NodeLink, myNodeType NodeType) (nodeInfo RequestIdentification /*TODO -> NodeInfo*/, err error) {
defer errcontextf(&err, "%s: identify", link) defer errcontextf(&err, "%s: identify", link)
/*
defer func() {
if err != nil {
err = fmt.Errorf("%s: identify: %s", link, err)
}
}()
*/
// the first conn must come with RequestIdentification packet // the first conn must come with RequestIdentification packet
conn, err := link.Accept() conn, err := link.Accept()
if err != nil { if err != nil {
......
...@@ -59,7 +59,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -59,7 +59,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
select { select {
case <-ctx.Done(): case <-ctx.Done():
// XXX tell peers we are shutting down? // XXX tell peers we are shutting down?
// XXX ret err = cancelled ? // XXX ret err = ctx.Err()
case <-retch: case <-retch:
} }
fmt.Printf("stor: %v: closing link\n", link) fmt.Printf("stor: %v: closing link\n", link)
...@@ -68,8 +68,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -68,8 +68,7 @@ func (stor *Storage) ServeLink(ctx context.Context, link *NodeLink) {
nodeInfo, err := IdentifyPeer(link, STORAGE) nodeInfo, err := IdentifyPeer(link, STORAGE)
if err != nil { if err != nil {
// XXX include link here or in IdentifyPeer ? fmt.Printf("stor: %v\n", err)
fmt.Printf("peer identification failed: %v\n", err)
return return
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment