Commit e8ac7104 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e17082f1
......@@ -68,7 +68,6 @@ type Master struct {
type nodeCome struct {
conn *neo.Conn
idReq *neo.RequestIdentification // we received this identification request
idResp chan neo.Msg // what we reply (AcceptIdentification | Error) XXX kill
}
// event: node disconnects
......@@ -168,7 +167,7 @@ func (m *Master) Run(ctx context.Context) error {
}
select {
case m.nodeCome <- nodeCome{conn, idReq, nil/*XXX kill*/}:
case m.nodeCome <- nodeCome{conn, idReq}:
// ok
case <-serveCtx.Done():
......@@ -256,29 +255,52 @@ func (m *Master) recovery(ctx context.Context) (err error) {
defer rcancel()
recovery := make(chan storRecovery)
inprogress := 0
inprogress := sync.WaitGroup{}
// start recovery on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++
go storCtlRecovery(rctx, stor.Link, recovery)
inprogress.Add(1)
go func() {
defer inprogress.Done()
storCtlRecovery(rctx, stor.Link, recovery)
}()
}
}
loop:
for {
select {
// XXX this is m.Accept() and semantic must be semantic of net.Accept() !
// new connection comes in
case n := <-m.nodeCome:
node, ok := m.accept(n, /* XXX only accept storages -> PENDING */)
if !ok {
break
}
node, resp, ok := m.accept(n, /* XXX only accept storages -> PENDING */)
// if new storage arrived - start recovery on it too
inprogress.Add(1)
go func() {
defer inprogress.Done()
// send identification response
// XXX cancel on ctx
err := n.conn.Send(resp)
if err != nil {
// XXX log
}
n.conn.Close() // XXX err
if err != nil || !ok {
// XXX must let recovery know it failed to update nodeTab
if ok {
recovery <- storRecovery{err: err}
}
c.conn.Link().Close() // XXX err
return
}
// new storage arrived - start recovery on it too
inprogress++
go storCtlRecovery(rctx, node.Link, recovery)
err := welcome(n.conn, resp)
// start recovery
storCtlRecovery(rctx, node.Link, recovery)
}()
case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link)
......@@ -287,12 +309,14 @@ loop:
// a storage node came through recovery - let's see whether
// ptid ↑ and if so we should take partition table from there
case r := <-recovery:
inprogress--
// inprogress--
if r.err != nil {
// XXX err ctx?
// XXX log here or in producer?
fmt.Printf("master: %v\n", r.err)
// XXX close stor link / update .nodeTab ?
break
}
......@@ -337,9 +361,22 @@ loop:
}
}
// consume left recovery responses (which should come without delay since it was cancelled)
for ; inprogress > 0; inprogress-- {
<-recovery
// // consume left recovery responses (which should come without delay since it was cancelled)
// for ; inprogress > 0; inprogress-- {
// <-recovery
// }
done := make(chan struct{})
go func() {
inprogress.Wait()
close(done)
}()
for {
select {
case <-recovery:
case <-done:
return err
}
return err
......@@ -376,7 +413,7 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov
}()
defer xerr.Contextf(&err, "%s: stor recovery", link)
conn, err := link.NewConn() // FIXME bad
conn, err := link.NewConn() // FIXME bad -> not bad
if err != nil {
return
}
......@@ -514,6 +551,8 @@ loop:
}
}
// XXX if m.partTab.OperationalWith(&.nodeTab, RUNNING) -> break (ok)
case ech := <-m.ctlStart:
ech <- nil // we are already starting
......@@ -541,7 +580,7 @@ loop:
type storVerify struct {
lastOid zodb.Oid
lastTid zodb.Tid
link *neo.NodeLink
link *neo.NodeLink // XXX -> Node
err error
}
......@@ -561,6 +600,8 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify)
// FIXME stub
conn, _ := link.NewConn()
// XXX NotifyPT (so storages save locally recovered PT)
locked := neo.AnswerLockedTransactions{}
err = conn.Ask(&neo.LockedTransactions{}, &locked)
if err != nil {
......@@ -648,16 +689,18 @@ loop:
return err
}
// accept processes identification request of just connected node and either accepts or declines it
// accept processes identification request of just connected node and either accepts or declines it.
// if node identification is accepted nodeTab is updated and corresponding node entry is returned
func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
// XXX no need for ok
func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) {
// TODO log node accept/rejected
// XXX also verify ? :
// - NodeType valid
// - IdTimestamp ?
if n.idReq.ClusterName != m.node.ClusterName {
n.idResp <- &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"} // XXX
return nil, false
return nil, &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}, false
}
nodeType := n.idReq.NodeType
......@@ -672,20 +715,19 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
n.idResp <- &neo.Error{neo.PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
return nil, false
return nil, &neo.Error{neo.PROTOCOL_ERROR, "uuid %v already used by another node" /*XXX*/}, false
}
// XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType {
case neo.CLIENT:
n.idResp <- &neo.Error{neo.NOT_READY, "cluster not operational"}
return nil, &neo.Error{neo.NOT_READY, "cluster not operational"}
// XXX ...
}
n.idResp <- &neo.AcceptIdentification{
accept := &neo.AcceptIdentification{
NodeType: neo.MASTER,
MyNodeUUID: m.node.MyInfo.NodeUUID,
NumPartitions: 1, // FIXME hardcoded
......@@ -713,7 +755,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, ok bool) {
}
node = m.nodeTab.Update(nodeInfo, n.conn.Link()) // NOTE this notifies all nodeTab subscribers
return node, true
return node, accept, true
}
// allocUUID allocates new node uuid for a node of kind nodeType
......@@ -734,6 +776,7 @@ func (m *Master) allocUUID(nodeType neo.NodeType) neo.NodeUUID {
panic("all uuid allocated ???") // XXX more robust ?
}
/* XXX goes away
// ServeLink serves incoming node-node link connection
// XXX +error return?
func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
......@@ -781,9 +824,9 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
}
// convey identification request to master and we are done here - the master takes on the torch
m.nodeCome <- nodeCome{conn, idReq, nil/*XXX kill*/}
m.nodeCome <- nodeCome{conn, idReq, nilXXX kill}
/*
//////////////////////////////////////////////////
// if master accepted this node - don't forget to notify when it leaves
_, rejected := idResp.(error)
if !rejected {
......@@ -877,8 +920,9 @@ func (m *Master) ServeLink(ctx context.Context, link *neo.NodeLink) {
// storage:
m.DriveStorage(ctx, link)
*/
/////////////////
}
*/
// ServeClient serves incoming connection on which peer identified itself as client
// XXX +error return?
......
......@@ -79,6 +79,7 @@ func Serve(ctx context.Context, l *neo.Listener, srv Server) error {
}
*/
/*
// ----------------------------------------
// XXX goes away? (we need a func to make sure to recv RequestIdentification
......@@ -128,3 +129,4 @@ func IdentifyPeer(link *neo.NodeLink, myNodeType neo.NodeType) (nodeInfo neo.Req
return req, nil
}
*/
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment