Commit 7efd3012 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 39bd3660
......@@ -27,6 +27,8 @@ import (
"math"
"os"
"sync"
"../zodb"
)
// Master is a node overseeing and managing how whole NEO cluster works
......@@ -34,6 +36,11 @@ type Master struct {
clusterName string
nodeUUID NodeUUID
// last allocated oid & tid
// XXX how to start allocating oid from 0, not 1 ?
lastOid zodb.Oid
lastTid zodb.Tid
// master manages node and partition tables and broadcast their updates
// to all nodes in cluster
stateMu sync.RWMutex // XXX recheck: needed ?
......@@ -48,8 +55,6 @@ type Master struct {
// channels from various workers to main driver
nodeCome chan nodeCome // node connected
nodeLeave chan nodeLeave // node disconnected
storRecovery chan storRecovery // storage node passed recovery XXX better explicitly pass to worker as arg?
}
type ctlStart struct {
......@@ -75,17 +80,9 @@ type nodeLeave struct {
// XXX TODO
}
// storage node passed recovery phase
type storRecovery struct {
partTab PartitionTable
// XXX + lastOid, lastTid, backup_tid, truncate_tid ?
// XXX + err ?
}
func NewMaster(clusterName string) *Master {
m := &Master{clusterName: clusterName}
m.clusterState = RECOVERING // XXX no elections - we are the only master
m.clusterState = ClusterRecovering // XXX no elections - we are the only master
go m.run(context.TODO()) // XXX ctx
return m
......@@ -95,21 +92,23 @@ func NewMaster(clusterName string) *Master {
// XXX NotifyNodeInformation to all nodes whenever nodetab changes
// XXX -> Start(), Stop()
/*
func (m *Master) SetClusterState(state ClusterState) error {
ch := make(chan error)
m.ctlState <- ctlState{state, ch}
return <-ch
}
*/
// run implements main master cluster management logic: node tracking, cluster
// state updates, scheduling data movement between storage nodes etc
func (m *Master) run(ctx context.Context) {
// current function to ask/control a storage depending on current cluster state
// + associated context covering all storage nodes
// XXX + waitgroup ?
storCtl := m.storCtlRecovery
storCtlCtx, storCtlCancel := context.WithCancel(ctx)
// // current function to ask/control a storage depending on current cluster state
// // + associated context covering all storage nodes
// // XXX + waitgroup ?
// storCtl := m.storCtlRecovery
// storCtlCtx, storCtlCancel := context.WithCancel(ctx)
for {
select {
......@@ -118,7 +117,7 @@ func (m *Master) run(ctx context.Context) {
// command to start cluster
case c := <-m.ctlStart:
if m.clusterState != ClusterRecovery {
if m.clusterState != ClusterRecovering {
// start possible only from recovery
// XXX err ctx
c.resp <- fmt.Errorf("start: inappropriate current state: %v", m.clusterState)
......@@ -126,7 +125,7 @@ func (m *Master) run(ctx context.Context) {
}
// check preconditions for start
if !m.partTab.OperationalWith(m.nodeTab) {
if !m.partTab.OperationalWith(&m.nodeTab) {
// XXX err ctx
// TODO + how much % PT is covered
c.resp <- fmt.Errorf("start: non-operational partition table")
......@@ -139,9 +138,10 @@ func (m *Master) run(ctx context.Context) {
// command to stop cluster
case c := <-m.ctlStop:
case <-m.ctlStop:
// TODO
/*
// node connects & requests identification
case n := <-m.nodeCome:
nodeInfo, ok := m.accept(n)
......@@ -173,120 +173,67 @@ func (m *Master) run(ctx context.Context) {
}
// XXX consider clusterState change
*/
}
}
_ = storCtlCancel // XXX
}
// recovery is a process that drives cluster via recovery phase
//
// XXX draft: Cluster Recovery if []Stor is fixed
// NOTE during recovery phase `recovery()` owns m.partTab
// XXX what about .nodeTab ?
func (m *Master) recovery(ctx context.Context, storv []*NodeLink) {
recovery := make(chan storRecovery)
wg := sync.WaitGroup{}
//wg := sync.WaitGroup{}
inprogress := 0
for _, stor := range storv {
wg.Add(1)
go storCtlRecovery(ctx, wg, stor, recovery)
//wg.Add(1)
inprogress++
go storCtlRecovery(ctx, stor, recovery)
}
loop:
for {
// XXX really inprogrss > 0 ? (we should be here indefinitely until commanded to start)
for inprogress > 0 {
select {
case <-ctx.Done():
// XXX
break loop
case r := <-recovery:
inprogress--
if r.partTab.ptid > m.partTab.ptid {
m.partTab = r.partTab
// XXX also transfer subscribers ?
// XXX -> during recovery no one must be subscribed to partTab
}
// TODO
// XXX another channel from master: request "ok to start?" - if ok we reply ok and exit
// if not ok - we just reply not ok
}
}
// XXX consume left recovery responces
wg.Wait()
//wg.Wait()
}
// accept processes identification request of just connected node and either accepts or declines it
// if node identification is accepted nodeTab is updated and corresponding nodeInfo is returned
func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
// XXX also verify ? :
// - NodeType valid
// - IdTimestamp ?
if n.idReq.ClusterName != m.clusterName {
n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX
return
}
nodeType := n.idReq.NodeType
uuid := n.idReq.NodeUUID
if uuid == 0 {
uuid = m.allocUUID(nodeType)
}
// XXX uuid < 0 (temporary) -> reallocate if conflict ?
node := m.nodeTab.Get(uuid)
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
return
}
// XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType {
case CLIENT:
n.idResp <- &Error{NOT_READY, "cluster not operational"}
// XXX ...
}
n.idResp <- &AcceptIdentification{
NodeType: MASTER,
MyNodeUUID: m.nodeUUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid,
}
// update nodeTab
var nodeState NodeState
switch nodeType {
case STORAGE:
// FIXME py sets to RUNNING/PENDING depending on cluster state
nodeState = PENDING
default:
nodeState = RUNNING
}
nodeInfo = NodeInfo{
NodeType: nodeType,
Address: n.idReq.Address,
NodeUUID: uuid,
NodeState: nodeState,
IdTimestamp: monotime(),
}
m.nodeTab.Update(nodeInfo) // NOTE this notifies al nodeTab subscribers
// storRecovery is result of a storage node passing recovery phase
type storRecovery struct {
partTab PartitionTable
// XXX + lastOid, lastTid, backup_tid, truncate_tid ?
return nodeInfo, true
err error
}
// storCtlRecovery drives a storage node during cluster recovering state
// TODO text
func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) {
func storCtlRecovery(ctx context.Context, link *NodeLink, res chan storRecovery) {
var err error
defer func() {
if err == nil {
......@@ -294,12 +241,15 @@ func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) {
}
// XXX on err still provide feedback to storRecovery chan ?
res<- storRecovery{err: err}
/*
fmt.Printf("master: %v", err)
// this must interrupt everything connected to stor node and
// thus eventually result in nodeLeave event to main driver
link.Close()
*/
}()
defer errcontextf(&err, "%s: stor recovery", link)
......@@ -339,39 +289,202 @@ func (m *Master) storCtlRecovery(ctx context.Context, link *NodeLink) {
}
}
m.storRecovery <- storRecovery{partTab: pt}
res <- storRecovery{partTab: pt}
}
// verify is a process that drives cluster via verification phase
//
// prerequisite for start: .partTab is operational wrt .nodeTab
//
// XXX draft: Cluster Verify if []Stor is fixed
func (m *Master) verify(ctx context.Context, storv []*NodeLink) error {
// XXX ask every storage for verify and wait for _all_ them to complete?
var err error
verify := make(chan storVerify)
vctx, vcancel := context.WithCancel(ctx)
defer vcancel()
inprogress := 0
// XXX do we need to reset m.lastOid / m.lastTid to 0 in the beginning?
for _, stor := range storv {
inprogress++
go storCtlVerify(vctx, stor, verify)
}
loop:
for inprogress > 0 {
select {
case <-ctx.Done():
err = ctx.Err()
break loop
case v := <-verify:
inprogress--
if v.err != nil {
fmt.Printf("master: %v\n", v.err) // XXX err ctx
// XXX mark S as non-working in nodeTab
// check partTab is still operational
// if not -> cancel to go back to recovery
if m.partTab.OperationalWith(&m.nodeTab) {
vcancel()
err = fmt.Errorf("cluster became non-operational in the process")
break loop
}
} else {
if v.lastOid > m.lastOid {
m.lastOid = v.lastOid
}
if v.lastTid > m.lastTid {
m.lastTid = v.lastTid
}
}
}
}
if err != nil {
fmt.Printf("master: verify: %v\n", err)
// consume left verify responses (which should come without delay since it was cancelled)
for ; inprogress > 0; inprogress-- {
<-verify
}
}
// XXX -> return via channel ?
return err
}
// storVerify is result of a storage node passing verification phase
type storVerify struct {
lastOid zodb.Oid
lastTid zodb.Tid
err error
}
// storCtlVerify drives a storage node during cluster verifying (= starting) state
// XXX does this need to be a member on Master ?
func (m *Master) storCtlVerify(ctx context.Context, link *NodeLink) {
func storCtlVerify(ctx context.Context, link *NodeLink, res chan storVerify) {
// XXX err context + link.Close on err
// XXX cancel on ctx
var err error
defer func() {
if err != nil {
res <- storVerify{err: err}
}
}()
defer errcontextf(&err, "%s: verify", link)
// FIXME stub
conn, _ := link.NewConn()
locked := AnswerLockedTransactions{}
err := Ask(&LockedTransactions, &locked)
err = Ask(conn, &LockedTransactions{}, &locked)
if err != nil {
return // XXX err
return
}
if len(locked.TidDict) {
if len(locked.TidDict) > 0 {
// TODO vvv
panic(fmt.Sprintf("non-ø locked txns in verify: %v", locked.TidDict))
err = fmt.Errorf("TODO: non-ø locked txns: %v", locked.TidDict)
return
}
last := AnswerLastIDs{}
err = Ask(&LastIDs, &last)
err = Ask(conn, &LastIDs{}, &last)
if err != nil {
return // XXX err
return
}
// XXX send this to driver (what to do with them ?) -> use for
// - oid allocations
// - next tid allocations etc
last.LastOID
last.LastTID
// send results to driver
res <- storVerify{lastOid: last.LastOid, lastTid: last.LastTid}
}
// XXX draft: Cluster Running if []Stor is fixed
func (m *Master) runxxx(ctx context.Context, storv []*NodeLink) {
// TODO
}
// XXX draft: Cluster Stopping if []Stor is fixed
func (m *Master) stop(ctx context.Context, storv []*NodeLink) {
// TODO
}
// accept processes identification request of just connected node and either accepts or declines it
// if node identification is accepted nodeTab is updated and corresponding nodeInfo is returned
func (m *Master) accept(n nodeCome) (nodeInfo NodeInfo, ok bool) {
// XXX also verify ? :
// - NodeType valid
// - IdTimestamp ?
if n.idReq.ClusterName != m.clusterName {
n.idResp <- &Error{PROTOCOL_ERROR, "cluster name mismatch"} // XXX
return
}
nodeType := n.idReq.NodeType
uuid := n.idReq.NodeUUID
if uuid == 0 {
uuid = m.allocUUID(nodeType)
}
// XXX uuid < 0 (temporary) -> reallocate if conflict ?
node := m.nodeTab.Get(uuid)
if node != nil {
// reject - uuid is already occupied by someone else
// XXX check also for down state - it could be the same node reconnecting
n.idResp <- &Error{PROTOCOL_ERROR, "uuid %v already used by another node"} // XXX
return
}
// XXX accept only certain kind of nodes depending on .clusterState, e.g.
switch nodeType {
case CLIENT:
n.idResp <- &Error{NOT_READY, "cluster not operational"}
// XXX ...
}
n.idResp <- &AcceptIdentification{
NodeType: MASTER,
MyNodeUUID: m.nodeUUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid,
}
// update nodeTab
var nodeState NodeState
switch nodeType {
case STORAGE:
// FIXME py sets to RUNNING/PENDING depending on cluster state
nodeState = PENDING
default:
nodeState = RUNNING
}
nodeInfo = NodeInfo{
NodeType: nodeType,
Address: n.idReq.Address,
NodeUUID: uuid,
NodeState: nodeState,
IdTimestamp: monotime(),
}
m.nodeTab.Update(nodeInfo) // NOTE this notifies al nodeTab subscribers
return nodeInfo, true
}
// allocUUID allocates new node uuid for a node of kind nodeType
// XXX it is bad idea for master to assign uuid to coming node
// -> better nodes generate really uniquie UUID themselves and always show with them
......
......@@ -130,15 +130,16 @@ type PartitionCell struct {
}
// Operational returns whether all object space is covered by at least some ready-to-serve nodes
// NOTE XXX operational here means only pt itself is operational
// for cluster to be really operational it has to be checked whether
// nodes referenced by pt are up and running
// OperationalWith returns whether all object space is covered by at least some ready-to-serve nodes
//
// XXX or keep not only NodeUUID in PartitionCell - add *Node ?
// for all partitions it checks both:
// - whether there are up-to-date entries in the partition table, and
// - whether there are corresponding storage nodes that are up
//
// information about nodes being up or down is obtained from supplied NodeTable
//
// XXX -> add `nt *NodeTable` as argument and check real node states there ?
func (pt *PartitionTable) Operational() bool {
// XXX or keep not only NodeUUID in PartitionCell - add *Node ?
func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
for _, ptEntry := range pt.ptTab {
if len(ptEntry) == 0 {
return false
......@@ -149,6 +150,12 @@ func (pt *PartitionTable) Operational() bool {
for _, cell := range ptEntry {
switch cell.CellState {
case UP_TO_DATE, FEEDING: // XXX cell.isReadble in py
// cell says it is readable. let's check whether corresponding node is up
node := nt.Get(cell.NodeUUID)
if node == nil || node.Info.NodeState != RUNNING { // XXX PENDING is also ok ?
continue
}
ok = true
break cellLoop
}
......
......@@ -448,8 +448,8 @@ func (p *AnswerRecovery) NEOEncodedInfo() (uint16, int) {
func (p *AnswerRecovery) NEOEncode(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.PTid))
binary.BigEndian.PutUint64(data[8:], uint64(p.BackupTID))
binary.BigEndian.PutUint64(data[16:], uint64(p.TruncateTID))
binary.BigEndian.PutUint64(data[8:], uint64(p.BackupTid))
binary.BigEndian.PutUint64(data[16:], uint64(p.TruncateTid))
}
func (p *AnswerRecovery) NEODecode(data []byte) (int, error) {
......@@ -457,8 +457,8 @@ func (p *AnswerRecovery) NEODecode(data []byte) (int, error) {
goto overflow
}
p.PTid = PTid(binary.BigEndian.Uint64(data[0:]))
p.BackupTID = zodb.Tid(binary.BigEndian.Uint64(data[8:]))
p.TruncateTID = zodb.Tid(binary.BigEndian.Uint64(data[16:]))
p.BackupTid = zodb.Tid(binary.BigEndian.Uint64(data[8:]))
p.TruncateTid = zodb.Tid(binary.BigEndian.Uint64(data[16:]))
return 24, nil
overflow:
......@@ -485,16 +485,16 @@ func (p *AnswerLastIDs) NEOEncodedInfo() (uint16, int) {
}
func (p *AnswerLastIDs) NEOEncode(data []byte) {
binary.BigEndian.PutUint64(data[0:], uint64(p.LastOID))
binary.BigEndian.PutUint64(data[8:], uint64(p.LastTID))
binary.BigEndian.PutUint64(data[0:], uint64(p.LastOid))
binary.BigEndian.PutUint64(data[8:], uint64(p.LastTid))
}
func (p *AnswerLastIDs) NEODecode(data []byte) (int, error) {
if uint32(len(data)) < 16 {
goto overflow
}
p.LastOID = zodb.Oid(binary.BigEndian.Uint64(data[0:]))
p.LastTID = zodb.Tid(binary.BigEndian.Uint64(data[8:]))
p.LastOid = zodb.Oid(binary.BigEndian.Uint64(data[0:]))
p.LastTid = zodb.Tid(binary.BigEndian.Uint64(data[8:]))
return 16, nil
overflow:
......
......@@ -308,8 +308,8 @@ type Recovery struct {
type AnswerRecovery struct {
PTid
BackupTID zodb.Tid
TruncateTID zodb.Tid
BackupTid zodb.Tid
TruncateTid zodb.Tid
}
// Ask the last OID/TID so that a master can initialize its TransactionManager.
......@@ -318,8 +318,8 @@ type LastIDs struct {
}
type AnswerLastIDs struct {
LastOID zodb.Oid
LastTID zodb.Tid
LastOid zodb.Oid
LastTid zodb.Tid
}
// Ask the full partition table. PM -> S.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment