Commit 4c4a0340 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 5d56a8eb
...@@ -162,7 +162,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master { ...@@ -162,7 +162,7 @@ func NewMaster(clusterName string, net xnet.Networker) *Master {
ctlShutdown: make(chan chan error), ctlShutdown: make(chan chan error),
nodeComeq: make(chan nodeCome), nodeComeq: make(chan nodeCome),
// nodeLeave: make(chan nodeLeave), nodeLeaveq: make(chan nodeLeave),
xtimeMono: xtime.Mono, xtimeMono: xtime.Mono,
} }
...@@ -438,6 +438,7 @@ loop: ...@@ -438,6 +438,7 @@ loop:
*/ */
}() }()
// XXX move up (before nodeComeq) ?
case n := <-m.nodeLeaveq: case n := <-m.nodeLeaveq:
// XXX -> move to func // XXX -> move to func
peer := n.peer peer := n.peer
...@@ -463,7 +464,6 @@ loop: ...@@ -463,7 +464,6 @@ loop:
r.stor.ResetLink(ctx) r.stor.ResetLink(ctx)
m.updateNodeState(ctx, r.stor, proto.DOWN) m.updateNodeState(ctx, r.stor, proto.DOWN)
// XXX stop sending nodeTab/partTab updates to this node // XXX stop sending nodeTab/partTab updates to this node
// XXX .nodeLeave <- r.stor ?
} }
} else { } else {
...@@ -1055,33 +1055,34 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms ...@@ -1055,33 +1055,34 @@ func (m *Master) serveClient1(ctx context.Context, req proto.Msg) (resp proto.Ms
// XXX place // XXX place
// called from main master process. XXX // called from main master process. XXX
func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode { func (m *Master) updateNodeTab(ctx context.Context, nodeInfo proto.NodeInfo) *xneo.PeerNode {
peer := m.node.State.NodeTab.Update(nodeInfo) node := m.node.State.NodeTab.Update(nodeInfo)
event := &_ΔNodeTab{nodeInfo} event := &_ΔNodeTab{nodeInfo}
// XXX locking // XXX locking
for nid, p := range m.peerTab { for nid, peer := range m.peerTab {
// TODO change limiting by buffer size to limiting by time - // TODO change limiting by buffer size to limiting by time -
// - i.e. detach peer if event queue grows more than 30s of time. // - i.e. detach peer if event queue grows more than 30s of time.
select { select {
case p.notifyq <- event: case peer.notifyq <- event:
continue // ok continue // ok
default: default:
} }
log.Warningf(ctx, "peer %s is slow -> detaching it", nid) log.Warningf(ctx, "peer %s is slow -> detaching it", nid)
close(p.notifyqOverflow) close(peer.notifyqOverflow)
// TODO delete(m.peerTab, nid) -> p.cancel() // TODO delete(m.peerTab, nid) -> p.cancel()
// XXX what else? // XXX what else?
panic("TODO") panic("TODO")
} }
return peer return node
} }
// XXX place // XXX place
// XXX doc // XXX doc
func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state proto.NodeState) { func (m *Master) updateNodeState(ctx context.Context, node *xneo.PeerNode, state proto.NodeState) {
nodei := node.NodeInfo nodei := node.NodeInfo
// XXX skip if .State == state ?
nodei.State = state nodei.State = state
m.updateNodeTab(ctx, nodei) m.updateNodeTab(ctx, nodei)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment