Commit ca0ffd62 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent a6684c9c
......@@ -87,9 +87,9 @@ func (n *NodeCommon) Dial(ctx context.Context, peerType NodeType, addr string) (
}
req := &RequestIdentification{
NodeType: n.MyInfo.NodeType,
NodeUUID: n.MyInfo.NodeUUID,
Address: n.MyInfo.Address,
NodeType: n.MyInfo.Type,
NodeUUID: n.MyInfo.UUID,
Address: n.MyInfo.Addr,
ClusterName: n.ClusterName,
IdTimestamp: n.MyInfo.IdTimestamp, // XXX ok?
}
......@@ -116,7 +116,7 @@ func (n *NodeCommon) Dial(ctx context.Context, peerType NodeType, addr string) (
// The node information about where it listens at is appropriately updated.
func (n *NodeCommon) Listen() (Listener, error) {
// start listening
ll, err := ListenLink(n.Net, n.MyInfo.Address.String())
ll, err := ListenLink(n.Net, n.MyInfo.Addr.String())
if err != nil {
return nil, err // XXX err ctx
}
......@@ -132,7 +132,7 @@ func (n *NodeCommon) Listen() (Listener, error) {
return nil, err // XXX err ctx
}
n.MyInfo.Address = addr
n.MyInfo.Addr = addr
l := &listener{
l: ll,
......
......@@ -45,9 +45,9 @@ import (
//
//
// XXX [] of
// .nodeUUID
// .nodeType
// .nodeState
// .UUID
// .Type
// .State
// .listenAt ip:port | ø // ø - if client or down(?)
//
// - - - - - - -
......@@ -103,7 +103,7 @@ type Node struct {
func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// FIXME linear scan
for _, node := range nt.nodev {
if node.NodeUUID == uuid {
if node.UUID == uuid {
return node
}
}
......@@ -115,7 +115,7 @@ func (nt *NodeTable) Get(uuid NodeUUID) *Node {
// Update updates information about a node
// it returns corresponding node entry for convenience
func (nt *NodeTable) Update(nodeInfo NodeInfo, conn *Conn /*XXX better link *NodeLink*/) *Node {
node := nt.Get(nodeInfo.NodeUUID)
node := nt.Get(nodeInfo.UUID)
if node == nil {
node = &Node{}
nt.nodev = append(nt.nodev, node)
......@@ -148,7 +148,7 @@ func (nt *NodeTable) GetByLink(link *NodeLink) *Node {
// XXX doc
func (nt *NodeTable) SetNodeState(node *Node, state NodeState) {
node.NodeState = state
node.State = state
traceNodeChanged(nt, node)
nt.notify(node.NodeInfo)
}
......@@ -173,7 +173,7 @@ func (nt *NodeTable) StorageList() []*Node {
// FIXME linear scan
sl := []*Node{}
for _, node := range nt.nodev {
if node.NodeType == STORAGE {
if node.Type == STORAGE {
sl = append(sl, node)
}
}
......@@ -189,7 +189,7 @@ func (nt *NodeTable) String() string {
// XXX also for .storv
for _, n := range nt.nodev {
// XXX recheck output
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", n.NodeUUID, n.NodeType, n.NodeState, n.Address)
fmt.Fprintf(&buf, "%s (%s)\t%s\t%s\n", n.UUID, n.Type, n.State, n.Addr)
}
return buf.String()
......
......@@ -40,12 +40,12 @@ package neo
//
// Given Np, R and []Storage PartitionTable tries to organize
//
// Pid -> []Storage
// pid -> []Storage
//
// mapping so that
//
// - redundancy level set by R is met
// - storages associated with adjacent Ptids are different
// - storages associated with adjacent pids are different
//
// when such organization is reached the partition table is called operational
// and non-operational otherwise. XXX and if storages are ready
......@@ -109,12 +109,12 @@ package neo
type PartitionTable struct {
// XXX do we need sync.Mutex here for updates ?
PtTab [][]PartitionCell // [#Np] XXX naming
tab [][]PartitionCell // [#Np] pid -> []Cell
PTid PTid // ↑ for versioning XXX -> ver ?
PTid PTid // ↑ for versioning XXX -> ver ? XXX move out of here?
}
// PartitionCell describes one storage in a ptid entry in partition table
// PartitionCell describes one storage in a pid entry in partition table
type PartitionCell struct {
NodeUUID
CellState
......@@ -131,8 +131,23 @@ type PartitionCell struct {
//
}
// MakePartTab creates new partition with uniformly distributed nodes
// The partition table created will be of len=np
// FIXME R=1 hardcoded
func MakePartTab(np int, nodev []*Node) *PartitionTable {
// XXX stub, not tested
tab := make([][]PartitionCell, np)
for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) {
node := nodev[j]
// XXX assert node.State > DOWN
tab[i] = []PartitionCell{{node.UUID, UP_TO_DATE /*XXX ok?*/}}
}
return &PartitionTable{tab: tab}
}
// OperationalWith returns whether all object space is covered by at least some ready-to-serve nodes
// OperationalWith checks whether all object space is covered by at least some ready-to-serve nodes
//
// for all partitions it checks both:
// - whether there are up-to-date entries in the partition table, and
......@@ -142,7 +157,7 @@ type PartitionCell struct {
//
// XXX or keep not only NodeUUID in PartitionCell - add *Node ?
func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
for _, ptEntry := range pt.PtTab {
for _, ptEntry := range pt.tab {
if len(ptEntry) == 0 {
return false
}
......@@ -159,7 +174,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
//
// We leave it as is for now.
node := nt.Get(cell.NodeUUID)
if node == nil || node.NodeState != RUNNING { // XXX PENDING is also ok ?
if node == nil || node.State != RUNNING { // XXX PENDING is also ok ?
continue
}
......@@ -181,8 +196,8 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
// XXX naming
func (pt *PartitionTable) Dump() []RowInfo { // XXX also include .ptid? -> struct ?
rowv := make([]RowInfo, len(pt.PtTab))
for i, row := range pt.PtTab {
rowv := make([]RowInfo, len(pt.tab))
for i, row := range pt.tab {
cellv := make([]CellInfo, len(row))
for j, cell := range cellv {
cellv[j] = CellInfo{NodeUUID: cell.NodeUUID, CellState: cell.CellState}
......@@ -199,13 +214,13 @@ func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable {
for _, row := range rowv {
i := row.Offset
for i >= uint32(len(pt.PtTab)) {
pt.PtTab = append(pt.PtTab, []PartitionCell{})
for i >= uint32(len(pt.tab)) {
pt.tab = append(pt.tab, []PartitionCell{})
}
//pt.PtTab[i] = append(pt.PtTab[i], row.CellList...)
//pt.tab[i] = append(pt.tab[i], row.CellList...)
for _, cell := range row.CellList {
pt.PtTab[i] = append(pt.PtTab[i], PartitionCell{
pt.tab[i] = append(pt.tab[i], PartitionCell{
NodeUUID: cell.NodeUUID,
CellState: cell.CellState,
})
......
......@@ -232,11 +232,11 @@ func float64_NEODecode(b []byte) float64 {
// NodeInfo is information about a node
// XXX -> nodetab.go ?
type NodeInfo struct {
NodeType
Address // serving address
NodeUUID
NodeState
IdTimestamp float64 // FIXME clarify semantic where it is used
Type NodeType
Addr Address // serving address
UUID NodeUUID
State NodeState
IdTimestamp float64 // FIXME clarify semantic where it is used
}
// XXX -> parttab.go ?
......
......@@ -198,10 +198,10 @@ func TestMasterStorage(t *testing.T) {
return &traceNode{
NodeTab: unsafe.Pointer(nt),
NodeInfo: neo.NodeInfo{
NodeType: typ,
Address: xnaddr(laddr),
NodeUUID: neo.UUID(typ, num),
NodeState: state,
Type: typ,
Addr: xnaddr(laddr),
UUID: neo.UUID(typ, num),
State: state,
IdTimestamp: idtstamp,
},
}
......@@ -284,16 +284,15 @@ func TestMasterStorage(t *testing.T) {
// M ready to start: new cluster, no in-progress S recovery
tc.Expect(masterStartReady(M, true))
// M <- start cmd
err := M.Start()
exc.Raiseif(err)
// XXX M.partTab = ø
// XXX M.partTab <- S1
// XXX M can start -> writes parttab to S and goes to verification
// XXX M.partTab <- ...
// XXX updated something cluster currently can be operational
// XXX temp
return
......
......@@ -93,7 +93,7 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
m := &Master{
node: neo.NodeCommon{
MyInfo: neo.NodeInfo{NodeType: neo.MASTER, Address: addr},
MyInfo: neo.NodeInfo{Type: neo.MASTER, Addr: addr},
ClusterName: clusterName,
Net: net,
MasterAddr: serveAddr, // XXX ok?
......@@ -169,10 +169,10 @@ func (m *Master) Run(ctx context.Context) (err error) {
}
m.node.MyInfo = neo.NodeInfo{
NodeType: neo.MASTER,
Address: naddr,
NodeUUID: m.allocUUID(neo.MASTER),
NodeState: neo.RUNNING,
Type: neo.MASTER,
Addr: naddr,
UUID: m.allocUUID(neo.MASTER),
State: neo.RUNNING,
IdTimestamp: 0, // XXX ok?
}
......@@ -301,7 +301,7 @@ func (m *Master) recovery(ctx context.Context) (err error) {
// start recovery on all storages we are currently in touch with
// XXX close links to clients
for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++
wg.Add(1)
go func() {
......@@ -377,7 +377,7 @@ loop:
// recovery and there is no in-progress recovery running
nup := 0
for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN {
if stor.State > neo.DOWN {
nup++
}
}
......@@ -434,6 +434,7 @@ loop:
close(done)
}()
loop2:
for {
select {
case r := <-recovery:
......@@ -442,14 +443,27 @@ loop:
if !xcontext.Canceled(errors.Cause(r.err)) {
// XXX not so ok
// log / close node link; update NT
// FIXME log / close node link; update NT
}
case <-done:
return err
break loop2
}
}
// if we are starting for new cluster - create partition table
if err != nil && m.partTab.PTid == 0 {
// XXX -> m.nodeTab.StorageList(State > DOWN)
storv := []*neo.Node{}
for _, stor := range m.nodeTab.StorageList() {
if stor.State > neo.DOWN {
storv = append(storv, stor)
}
}
m.partTab = neo.MakePartTab(1 /* XXX hardcoded */, storv)
}
return err
}
// storCtlRecovery drives a storage node during cluster recovering state
......@@ -537,7 +551,7 @@ func (m *Master) verify(ctx context.Context) (err error) {
// start verification on all storages we are currently in touch with
for _, stor := range m.nodeTab.StorageList() {
if stor.NodeState > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
if stor.State > neo.DOWN { // XXX state cmp ok ? XXX or stor.Link != nil ?
inprogress++
go storCtlVerify(vctx, stor, verify)
}
......@@ -721,7 +735,7 @@ loop:
// XXX
}
switch node.NodeType {
switch node.Type {
case STORAGE:
switch state {
case ClusterRecovery:
......@@ -918,7 +932,7 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
accept := &neo.AcceptIdentification{
NodeType: neo.MASTER,
MyNodeUUID: m.node.MyInfo.NodeUUID,
MyNodeUUID: m.node.MyInfo.UUID,
NumPartitions: 1, // FIXME hardcoded
NumReplicas: 1, // FIXME hardcoded
YourNodeUUID: uuid,
......@@ -936,10 +950,10 @@ func (m *Master) identify(ctx context.Context, n nodeCome) (node *neo.Node, resp
}
nodeInfo := neo.NodeInfo{
NodeType: nodeType,
Address: n.idReq.Address,
NodeUUID: uuid,
NodeState: nodeState,
Type: nodeType,
Addr: n.idReq.Address,
UUID: uuid,
State: nodeState,
IdTimestamp: m.monotime(),
}
......
......@@ -67,7 +67,7 @@ func NewStorage(cluster, masterAddr, serveAddr string, net xnet.Networker, zstor
stor := &Storage{
node: neo.NodeCommon{
MyInfo: neo.NodeInfo{NodeType: neo.STORAGE, Address: addr},
MyInfo: neo.NodeInfo{Type: neo.STORAGE, Addr: addr},
ClusterName: cluster,
Net: net,
MasterAddr: masterAddr,
......@@ -193,9 +193,9 @@ func (stor *Storage) talkMaster1(ctx context.Context) (err error) {
}
// XXX -> node.Dial ?
if accept.YourNodeUUID != stor.node.MyInfo.NodeUUID {
if accept.YourNodeUUID != stor.node.MyInfo.UUID {
log.Infof(ctx, "master told us to have uuid=%v", accept.YourNodeUUID)
stor.node.MyInfo.NodeUUID = accept.YourNodeUUID
stor.node.MyInfo.UUID = accept.YourNodeUUID
}
......
......@@ -64,21 +64,21 @@ func (*NodeInfo) neoMsgCode() uint16 {
}
func (p *NodeInfo) neoMsgEncodedLen() int {
return 26 + len(p.Address.Host)
return 26 + len(p.Addr.Host)
}
func (p *NodeInfo) neoMsgEncode(data []byte) {
binary.BigEndian.PutUint32(data[0:], uint32(int32(p.NodeType)))
binary.BigEndian.PutUint32(data[0:], uint32(int32(p.Type)))
{
l := uint32(len(p.Address.Host))
l := uint32(len(p.Addr.Host))
binary.BigEndian.PutUint32(data[4:], l)
data = data[8:]
copy(data, p.Address.Host)
copy(data, p.Addr.Host)
data = data[l:]
}
binary.BigEndian.PutUint16(data[0:], p.Address.Port)
binary.BigEndian.PutUint32(data[2:], uint32(int32(p.NodeUUID)))
binary.BigEndian.PutUint32(data[6:], uint32(int32(p.NodeState)))
binary.BigEndian.PutUint16(data[0:], p.Addr.Port)
binary.BigEndian.PutUint32(data[2:], uint32(int32(p.UUID)))
binary.BigEndian.PutUint32(data[6:], uint32(int32(p.State)))
float64_NEOEncode(data[10:], p.IdTimestamp)
}
......@@ -87,7 +87,7 @@ func (p *NodeInfo) neoMsgDecode(data []byte) (int, error) {
if uint32(len(data)) < 8 {
goto overflow
}
p.NodeType = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
p.Type = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
{
l := binary.BigEndian.Uint32(data[4:])
data = data[8:]
......@@ -95,12 +95,12 @@ func (p *NodeInfo) neoMsgDecode(data []byte) (int, error) {
goto overflow
}
nread += 18 + l
p.Address.Host = string(data[:l])
p.Addr.Host = string(data[:l])
data = data[l:]
}
p.Address.Port = binary.BigEndian.Uint16(data[0:])
p.NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[2:])))
p.NodeState = NodeState(int32(binary.BigEndian.Uint32(data[6:])))
p.Addr.Port = binary.BigEndian.Uint16(data[0:])
p.UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[2:])))
p.State = NodeState(int32(binary.BigEndian.Uint32(data[6:])))
p.IdTimestamp = float64_NEODecode(data[10:])
return 8 + int(nread), nil
......@@ -2546,7 +2546,7 @@ func (p *AnswerNodeList) neoMsgEncodedLen() int {
var size int
for i := 0; i < len(p.NodeList); i++ {
a := &p.NodeList[i]
size += len((*a).Address.Host)
size += len((*a).Addr.Host)
}
return 4 + len(p.NodeList)*26 + size
}
......@@ -2558,17 +2558,17 @@ func (p *AnswerNodeList) neoMsgEncode(data []byte) {
data = data[4:]
for i := 0; uint32(i) < l; i++ {
a := &p.NodeList[i]
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).NodeType)))
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).Type)))
{
l := uint32(len((*a).Address.Host))
l := uint32(len((*a).Addr.Host))
binary.BigEndian.PutUint32(data[4:], l)
data = data[8:]
copy(data, (*a).Address.Host)
copy(data, (*a).Addr.Host)
data = data[l:]
}
binary.BigEndian.PutUint16(data[0:], (*a).Address.Port)
binary.BigEndian.PutUint32(data[2:], uint32(int32((*a).NodeUUID)))
binary.BigEndian.PutUint32(data[6:], uint32(int32((*a).NodeState)))
binary.BigEndian.PutUint16(data[0:], (*a).Addr.Port)
binary.BigEndian.PutUint32(data[2:], uint32(int32((*a).UUID)))
binary.BigEndian.PutUint32(data[6:], uint32(int32((*a).State)))
float64_NEOEncode(data[10:], (*a).IdTimestamp)
data = data[18:]
}
......@@ -2589,7 +2589,7 @@ func (p *AnswerNodeList) neoMsgDecode(data []byte) (int, error) {
if uint32(len(data)) < 8 {
goto overflow
}
(*a).NodeType = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
(*a).Type = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
{
l := binary.BigEndian.Uint32(data[4:])
data = data[8:]
......@@ -2597,12 +2597,12 @@ func (p *AnswerNodeList) neoMsgDecode(data []byte) (int, error) {
goto overflow
}
nread += 18 + l
(*a).Address.Host = string(data[:l])
(*a).Addr.Host = string(data[:l])
data = data[l:]
}
(*a).Address.Port = binary.BigEndian.Uint16(data[0:])
(*a).NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[2:])))
(*a).NodeState = NodeState(int32(binary.BigEndian.Uint32(data[6:])))
(*a).Addr.Port = binary.BigEndian.Uint16(data[0:])
(*a).UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[2:])))
(*a).State = NodeState(int32(binary.BigEndian.Uint32(data[6:])))
(*a).IdTimestamp = float64_NEODecode(data[10:])
data = data[18:]
}
......@@ -2747,7 +2747,7 @@ func (p *NotifyNodeInformation) neoMsgEncodedLen() int {
var size int
for i := 0; i < len(p.NodeList); i++ {
a := &p.NodeList[i]
size += len((*a).Address.Host)
size += len((*a).Addr.Host)
}
return 12 + len(p.NodeList)*26 + size
}
......@@ -2760,17 +2760,17 @@ func (p *NotifyNodeInformation) neoMsgEncode(data []byte) {
data = data[12:]
for i := 0; uint32(i) < l; i++ {
a := &p.NodeList[i]
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).NodeType)))
binary.BigEndian.PutUint32(data[0:], uint32(int32((*a).Type)))
{
l := uint32(len((*a).Address.Host))
l := uint32(len((*a).Addr.Host))
binary.BigEndian.PutUint32(data[4:], l)
data = data[8:]
copy(data, (*a).Address.Host)
copy(data, (*a).Addr.Host)
data = data[l:]
}
binary.BigEndian.PutUint16(data[0:], (*a).Address.Port)
binary.BigEndian.PutUint32(data[2:], uint32(int32((*a).NodeUUID)))
binary.BigEndian.PutUint32(data[6:], uint32(int32((*a).NodeState)))
binary.BigEndian.PutUint16(data[0:], (*a).Addr.Port)
binary.BigEndian.PutUint32(data[2:], uint32(int32((*a).UUID)))
binary.BigEndian.PutUint32(data[6:], uint32(int32((*a).State)))
float64_NEOEncode(data[10:], (*a).IdTimestamp)
data = data[18:]
}
......@@ -2792,7 +2792,7 @@ func (p *NotifyNodeInformation) neoMsgDecode(data []byte) (int, error) {
if uint32(len(data)) < 8 {
goto overflow
}
(*a).NodeType = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
(*a).Type = NodeType(int32(binary.BigEndian.Uint32(data[0:])))
{
l := binary.BigEndian.Uint32(data[4:])
data = data[8:]
......@@ -2800,12 +2800,12 @@ func (p *NotifyNodeInformation) neoMsgDecode(data []byte) (int, error) {
goto overflow
}
nread += 18 + l
(*a).Address.Host = string(data[:l])
(*a).Addr.Host = string(data[:l])
data = data[l:]
}
(*a).Address.Port = binary.BigEndian.Uint16(data[0:])
(*a).NodeUUID = NodeUUID(int32(binary.BigEndian.Uint32(data[2:])))
(*a).NodeState = NodeState(int32(binary.BigEndian.Uint32(data[6:])))
(*a).Addr.Port = binary.BigEndian.Uint16(data[0:])
(*a).UUID = NodeUUID(int32(binary.BigEndian.Uint32(data[2:])))
(*a).State = NodeState(int32(binary.BigEndian.Uint32(data[6:])))
(*a).IdTimestamp = float64_NEODecode(data[10:])
data = data[18:]
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment