Commit 9afb634d authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent eb2becec
...@@ -139,12 +139,11 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -139,12 +139,11 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
// subscribe to nodeTab/partTab/clusterState and notify peer with updates // subscribe to nodeTab/partTab/clusterState and notify peer with updates
m.stateMu.Lock() m.stateMu.Lock()
//clusterCh := make(chan ClusterState)
nodeCh, nodeUnsubscribe := m.nodeTab.SubscribeBuffered() nodeCh, nodeUnsubscribe := m.nodeTab.SubscribeBuffered()
_ = nodeCh; _ = nodeUnsubscribe
//partCh, partUnsubscribe := m.partTab.SubscribeBuffered() //partCh, partUnsubscribe := m.partTab.SubscribeBuffered()
// TODO cluster subscribe // TODO cluster subscribe
//clusterCh := make(chan ClusterState)
//m.clusterNotifyv = append(m.clusterNotifyv, clusterCh) //m.clusterNotifyv = append(m.clusterNotifyv, clusterCh)
...@@ -159,25 +158,22 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) { ...@@ -159,25 +158,22 @@ func (m *Master) ServeLink(ctx context.Context, link *NodeLink) {
// ClusterInformation (PM -> * ?) // ClusterInformation (PM -> * ?)
m.stateMu.Unlock() m.stateMu.Unlock()
/*
go func() { go func() {
var updates []... for {
select {
case <-ctx.Done():
}() // TODO unsubscribe
// XXX we are not draining on cancel - how to free internal buffer ?
return
go func() {
var clusterState ClusterState case nodeUpdateV := <-nodeCh:
changed := false // TODO
select { case clusterState = <-clusterCh:
case clusterState = <-clusterCh: changed = true
changed = true }
}
}() }()
*/
// identification passed, now serve other requests // identification passed, now serve other requests
......
...@@ -88,7 +88,9 @@ type Node struct { ...@@ -88,7 +88,9 @@ type Node struct {
// Subscribe subscribes to NodeTable updates // Subscribe subscribes to NodeTable updates
// it returns a channel via which updates will be delivered // it returns a channel via which updates will be delivered and unsubscribe function
//
// XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) { func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) {
ch = make(chan *Node) // XXX how to specify ch buf size if needed ? ch = make(chan *Node) // XXX how to specify ch buf size if needed ?
nt.subscribev = append(nt.subscribev, ch) nt.subscribev = append(nt.subscribev, ch)
...@@ -108,13 +110,13 @@ func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) { ...@@ -108,13 +110,13 @@ func (nt *NodeTable) Subscribe() (ch chan *Node, unsubscribe func()) {
} }
// SubscribeBufferred subscribes to NodeTable updates without blocking updater // SubscribeBufferred subscribes to NodeTable updates without blocking updater
// it returns a channel via which updates are delivered // it returns a channel via which updates are delivered and unsubscribe function
// the updates are sent to destination in non-blocking way - if destination channel is not ready // the updates will be sent to destination in non-blocking way - if destination
// they will be bufferred. // channel is not ready they will be bufferred.
// it is the caller reponsibility to make sure such buffering does not grow up // it is the caller reponsibility to make sure such buffering does not grow up
// to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown // to infinity - via e.g. detecting stuck connections and unsubscribing on shutdown
// //
// must be called with stateMu held // XXX locking: client for subscribe/unsubscribe XXX ok?
func (nt *NodeTable) SubscribeBuffered() (ch chan []*Node, unsubscribe func()) { func (nt *NodeTable) SubscribeBuffered() (ch chan []*Node, unsubscribe func()) {
in, unsubscribe := nt.Subscribe() in, unsubscribe := nt.Subscribe()
ch = make(chan []*Node) ch = make(chan []*Node)
...@@ -142,7 +144,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []*Node, unsubscribe func()) { ...@@ -142,7 +144,7 @@ func (nt *NodeTable) SubscribeBuffered() (ch chan []*Node, unsubscribe func()) {
break break
} }
// FIXME better merge as same node could be updated several times // FIXME merge updates as same node could be updated several times
updatev = append(updatev, update) updatev = append(updatev, update)
case out <- updatev: case out <- updatev:
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment