Commit 2c0345b9 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent e02a2e7f
...@@ -82,15 +82,14 @@ type NodeTable struct { ...@@ -82,15 +82,14 @@ type NodeTable struct {
} }
// Node represents a node entry in NodeTable // Node represents a node entry in NodeTable
// XXX naming -> NodeEntry?
type Node struct { type Node struct {
NodeInfo // XXX good idea to embed ? NodeInfo
Link *NodeLink // link to this node; =nil if not connected XXX do we need it here ? // link to this node; =nil if not connected
// XXX identified or not ? Link *NodeLink
// XXX -> not needed - we only add something to nodetab after identification
// XXX ^^^ is about master. How about e.g. Client that received nodetab // XXX something indicating in-flight connecti/identification
// entry and wants to talk to that node? // (wish Link != nil means connected and identified)
} }
...@@ -136,6 +135,12 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node { ...@@ -136,6 +135,12 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
return nil return nil
} }
// XXX doc
func (nt *NodeTable) SetNodeState(node *Node, state neo.NodeState) {
node.NodeState = state
nt.notify(node.NodeInfo)
}
// UpdateLinkDown updates information about corresponding to link node and marks it as down // UpdateLinkDown updates information about corresponding to link node and marks it as down
// it returns corresponding node entry for convenience // it returns corresponding node entry for convenience
// XXX is this a good idea ? // XXX is this a good idea ?
...@@ -146,9 +151,7 @@ func (nt *NodeTable) UpdateLinkDown(link *NodeLink) *Node { ...@@ -146,9 +151,7 @@ func (nt *NodeTable) UpdateLinkDown(link *NodeLink) *Node {
panic("nodetab: UpdateLinkDown: no corresponding entry") panic("nodetab: UpdateLinkDown: no corresponding entry")
} }
node.NodeState = DOWN nt.SetNodeState(node, DOWN)
nt.notify(node.NodeInfo)
return node return node
} }
......
...@@ -72,7 +72,7 @@ type nodeCome struct { ...@@ -72,7 +72,7 @@ type nodeCome struct {
// event: node disconnects // event: node disconnects
type nodeLeave struct { type nodeLeave struct {
link *neo.NodeLink // XXX better use uuid allocated on nodeCome node *neo.Node
} }
// NewMaster creates new master node that will listen on serveAddr. // NewMaster creates new master node that will listen on serveAddr.
...@@ -241,6 +241,14 @@ func (m *Master) runMain(ctx context.Context) (err error) { ...@@ -241,6 +241,14 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// - monitor whether partition table becomes operational wrt currently up nodeset // - monitor whether partition table becomes operational wrt currently up nodeset
// - if yes - finish recovering upon receiving "start" command XXX or autostart // - if yes - finish recovering upon receiving "start" command XXX or autostart
// storRecovery is result of 1 storage node passing recovery phase
type storRecovery struct {
node *Node
partTab neo.PartitionTable
// XXX + backup_tid, truncate_tid ?
err error
}
// recovery drives cluster during recovery phase // recovery drives cluster during recovery phase
// //
// when recovery finishes error indicates: // when recovery finishes error indicates:
...@@ -255,15 +263,16 @@ func (m *Master) recovery(ctx context.Context) (err error) { ...@@ -255,15 +263,16 @@ func (m *Master) recovery(ctx context.Context) (err error) {
defer rcancel() defer rcancel()
recovery := make(chan storRecovery) recovery := make(chan storRecovery)
inprogress := sync.WaitGroup{} wg := sync.WaitGroup{}
// start recovery on all storages we are currently in touch with // start recovery on all storages we are currently in touch with
// XXX close links to clients
for _, stor := range m.nodeTab.StorageList() { for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ? if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress.Add(1) wg.Add(1)
go func() { go func() {
defer inprogress.Done() defer wg.Done()
storCtlRecovery(rctx, stor.Link, recovery) storCtlRecovery(rctx, stor, recovery)
}() }()
} }
} }
...@@ -273,37 +282,32 @@ loop: ...@@ -273,37 +282,32 @@ loop:
select { select {
// new connection comes in // new connection comes in
case n := <-m.nodeCome: case n := <-m.nodeCome:
node, resp, ok := m.accept(n, /* XXX only accept storages -> PENDING */) node, resp := m.accept(n, /* XXX only accept storages -> PENDING */)
// if new storage arrived - start recovery on it too // if new storage arrived - start recovery on it too
inprogress.Add(1) wg.Add(1)
go func() { go func() {
defer inprogress.Done() defer wg.Done()
// send identification response if node == nil {
// XXX cancel on ctx m.reject(n.conn, resp)
err := n.conn.Send(resp)
if err != nil {
// XXX log
}
n.conn.Close() // XXX err
if err != nil || !ok {
// XXX must let recovery know it failed to update nodeTab
if ok {
recovery <- storRecovery{err: err}
}
c.conn.Link().Close() // XXX err
return return
} }
err := welcome(n.conn, resp) err := m.welcome(n.conn, resp)
if err != nil {
// XXX better m.nodeLeave <- nodeLeave{node, err} ?
// XXX move this m.nodeLeave <- to welcome() ?
recovery <- storRecovery{node: node, err: err}
}
// start recovery // start recovery
storCtlRecovery(rctx, node.Link, recovery) storCtlRecovery(rctx, node, recovery)
}() }()
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link) // XXX close n.node.Link ?
m.nodeTab.SetNodeState(n.node, DOWN)
// XXX update something indicating cluster currently can be operational or not ? // XXX update something indicating cluster currently can be operational or not ?
// a storage node came through recovery - let's see whether // a storage node came through recovery - let's see whether
...@@ -316,7 +320,7 @@ loop: ...@@ -316,7 +320,7 @@ loop:
// XXX log here or in producer? // XXX log here or in producer?
fmt.Printf("master: %v\n", r.err) fmt.Printf("master: %v\n", r.err)
// XXX close stor link / update .nodeTab ? // XXX close stor link / update .nodeTab
break break
} }
...@@ -366,9 +370,10 @@ loop: ...@@ -366,9 +370,10 @@ loop:
// <-recovery // <-recovery
// } // }
// wait all workers finish (which should come without delay since it was cancelled)
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
inprogress.Wait() wg.Wait()
close(done) close(done)
}() }()
...@@ -376,24 +381,16 @@ loop: ...@@ -376,24 +381,16 @@ loop:
select { select {
case <-recovery: case <-recovery:
case <-done: case <-done:
return err break
}
} }
return err return err
} }
// storRecovery is result of a storage node passing recovery phase
type storRecovery struct {
node *Node
partTab neo.PartitionTable
// XXX + backup_tid, truncate_tid ?
err error
}
// storCtlRecovery drives a storage node during cluster recovering state // storCtlRecovery drives a storage node during cluster recovering state
// it retrieves various ids and partition table from as stored on the storage // it retrieves various ids and partition table from as stored on the storage
func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecovery) { func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery) {
var err error var err error
// XXX where this close link on error should be ? // XXX where this close link on error should be ?
defer func() { defer func() {
...@@ -402,7 +399,7 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov ...@@ -402,7 +399,7 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov
} }
// XXX on err still provide feedback to storRecovery chan ? // XXX on err still provide feedback to storRecovery chan ?
res <- storRecovery{err: err} res <- storRecovery{node: stor, err: err}
/* /*
fmt.Printf("master: %v", err) fmt.Printf("master: %v", err)
...@@ -412,9 +409,9 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov ...@@ -412,9 +409,9 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov
link.Close() link.Close()
*/ */
}() }()
defer xerr.Contextf(&err, "%s: stor recovery", link) defer xerr.Contextf(&err, "%s: stor recovery", stor.link)
conn, err := link.NewConn() // FIXME bad -> not bad conn, err := stor.link.NewConn() // FIXME bad -> not bad
if err != nil { if err != nil {
return return
} }
...@@ -450,7 +447,7 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov ...@@ -450,7 +447,7 @@ func storCtlRecovery(ctx context.Context, link *neo.NodeLink, res chan storRecov
} }
} }
res <- storRecovery{partTab: pt} res <- storRecovery{node: stor, partTab: pt}
} }
...@@ -458,11 +455,15 @@ var errStopRequested = errors.New("stop requested") ...@@ -458,11 +455,15 @@ var errStopRequested = errors.New("stop requested")
var errClusterDegraded = errors.New("cluster became non-operatonal") var errClusterDegraded = errors.New("cluster became non-operatonal")
// Cluster Verification // Cluster Verification (data recovery)
// -------------------- // ------------------------------------
// //
// - starts with operational partition table // - starts with operational partition table
// - tell all storages to perform data verification (TODO) and retrieve last ids // - saves recovered partition table to all storages
// - asks all storages for partly finished transactions and decides cluster-wide
// which such transactions need to be either finished or rolled back.
// - executes decided finishes and rollbacks on the storages.
// - retrieve last ids from storages along the way.
// - once we are done without loosing too much storages in the process (so that // - once we are done without loosing too much storages in the process (so that
// partition table is still operational) we are ready to enter servicing state. // partition table is still operational) we are ready to enter servicing state.
...@@ -630,7 +631,8 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify) ...@@ -630,7 +631,8 @@ func storCtlVerify(ctx context.Context, link *neo.NodeLink, res chan storVerify)
// Cluster Running // Cluster Running
// --------------- // ---------------
// //
// - starts with operational parttab and (enough ?) present storage nodes passed verification // - starts with operational parttab and all present storage nodes consistently
// either finished or rolled-back partly finished transactions
// - monitor storages come & go and if parttab becomes non-operational leave to recovery // - monitor storages come & go and if parttab becomes non-operational leave to recovery
// - provide service to clients while we are here // - provide service to clients while we are here
// //
...@@ -659,6 +661,7 @@ loop: ...@@ -659,6 +661,7 @@ loop:
state := m.ClusterState state := m.ClusterState
/*
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
...@@ -689,7 +692,9 @@ loop: ...@@ -689,7 +692,9 @@ loop:
// TODO // TODO
} }
}() }()
*/
/*
// a storage node came through recovery - if we are still recovering let's see whether // a storage node came through recovery - if we are still recovering let's see whether
// ptid ↑ and if so we should take partition table from there // ptid ↑ and if so we should take partition table from there
case r := <-recovery: case r := <-recovery:
...@@ -759,6 +764,7 @@ loop: ...@@ -759,6 +764,7 @@ loop:
} }
// XXX if m.partTab.OperationalWith(&.nodeTab, RUNNING) -> break (ok) // XXX if m.partTab.OperationalWith(&.nodeTab, RUNNING) -> break (ok)
*/
case n := <-m.nodeLeave: case n := <-m.nodeLeave:
m.nodeTab.UpdateLinkDown(n.link) m.nodeTab.UpdateLinkDown(n.link)
...@@ -814,9 +820,10 @@ loop: ...@@ -814,9 +820,10 @@ loop:
} }
// accept processes identification request of just connected node and either accepts or declines it. // accept processes identification request of just connected node and either accepts or declines it.
// if node identification is accepted nodeTab is updated and corresponding node entry is returned // If node identification is accepted nodeTab is updated and corresponding node entry is returned.
// XXX no need for ok // Response message is constructed but not send back not to block the caller - it is
func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) { // the caller responsibility to send the response to node which requested identification.
func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg) {
// TODO log node accept/rejected // TODO log node accept/rejected
// XXX also verify ? : // XXX also verify ? :
...@@ -824,7 +831,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) { ...@@ -824,7 +831,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) {
// - IdTimestamp ? // - IdTimestamp ?
if n.idReq.ClusterName != m.node.ClusterName { if n.idReq.ClusterName != m.node.ClusterName {
return nil, &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}, false return nil, &neo.Error{neo.PROTOCOL_ERROR, "cluster name mismatch"}
} }
nodeType := n.idReq.NodeType nodeType := n.idReq.NodeType
...@@ -839,7 +846,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) { ...@@ -839,7 +846,7 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) {
if node != nil { if node != nil {
// reject - uuid is already occupied by someone else // reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting // XXX check also for down state - it could be the same node reconnecting
return nil, &neo.Error{neo.PROTOCOL_ERROR, "uuid %v already used by another node" /*XXX*/}, false return nil, &neo.Error{neo.PROTOCOL_ERROR, "uuid %v already used by another node" /*XXX*/}
} }
// XXX accept only certain kind of nodes depending on .clusterState, e.g. // XXX accept only certain kind of nodes depending on .clusterState, e.g.
...@@ -879,7 +886,25 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) { ...@@ -879,7 +886,25 @@ func (m *Master) accept(n nodeCome) (node *neo.Node, resp neo.Msg, ok bool) {
} }
node = m.nodeTab.Update(nodeInfo, n.conn.Link()) // NOTE this notifies all nodeTab subscribers node = m.nodeTab.Update(nodeInfo, n.conn.Link()) // NOTE this notifies all nodeTab subscribers
return node, accept, true return node, accept
}
// reject sends rejective identification response and closes associated link
func (m *Master) reject(conn *neo.Conn, resp neo.Msg) {
// XXX cancel on ctx?
// XXX log?
err1 := conn.Send(resp)
err2 := conn.Close()
err3 := conn.Link().Close()
log xerr.Merge(err1, err2, err3)
}
// welcome sends acceptive identification response and closes conn
func (m *Master) welcome(conn *neo.Conn, resp neo.Msg) error {
// XXX cancel on ctx
err1 := conn.Send(resp)
err2 := conn.Close()
return xerr.First(err1, err2)
} }
// allocUUID allocates new node uuid for a node of kind nodeType // allocUUID allocates new node uuid for a node of kind nodeType
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment