Commit 3fcd0178 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 474c67e4
......@@ -175,3 +175,42 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
return true
}
// ---- encode / decode PT to / from messages
// XXX naming
func (pt *PartitionTable) Dump() []RowInfo { // XXX also include .ptid? -> struct ?
rowv := make([]RowInfo, len(pt.PtTab))
for i, row := range pt.PtTab {
cellv := make([]CellInfo, len(row))
for j, cell := range cellv {
cellv[j] = CellInfo{NodeUUID: cell.NodeUUID, CellState: cell.CellState}
}
rowv[i] = RowInfo{Offset: uint32(i), CellList: cellv} // XXX cast?
}
return rowv
}
func PartTabFromDump(ptid PTid, rowv []RowInfo) *PartitionTable {
// reconstruct partition table from response
pt := &PartitionTable{}
for _, row := range rowv {
i := row.Offset
for i >= uint32(len(pt.PtTab)) {
pt.PtTab = append(pt.PtTab, []PartitionCell{})
}
//pt.PtTab[i] = append(pt.PtTab[i], row.CellList...)
for _, cell := range row.CellList {
pt.PtTab[i] = append(pt.PtTab[i], PartitionCell{
NodeUUID: cell.NodeUUID,
CellState: cell.CellState,
})
}
}
return pt
}
......@@ -51,8 +51,8 @@ type Master struct {
// to all nodes in cluster
///*
stateMu sync.RWMutex // XXX recheck: needed ?
nodeTab neo.NodeTable
partTab neo.PartitionTable
nodeTab *neo.NodeTable
partTab *neo.PartitionTable // XXX ^ is also in node
clusterState neo.ClusterState
//*/
clusterInfo neo.ClusterInfo
......@@ -96,6 +96,9 @@ func NewMaster(clusterName, serveAddr string, net xnet.Networker) *Master {
MasterAddr: serveAddr, // XXX ok?
},
nodeTab: &neo.NodeTable{},
partTab: &neo.PartitionTable{},
ctlStart: make(chan chan error),
ctlStop: make(chan chan struct{}),
ctlShutdown: make(chan chan error),
......@@ -248,7 +251,7 @@ func (m *Master) runMain(ctx context.Context) (err error) {
// storRecovery is result of 1 storage node passing recovery phase
type storRecovery struct {
stor *neo.Node
partTab neo.PartitionTable
partTab *neo.PartitionTable
err error
// XXX + backup_tid, truncate_tid ?
......@@ -335,7 +338,7 @@ loop:
}
// update indicator whether cluster currently can be operational or not
readyToStart = m.partTab.OperationalWith(&m.nodeTab) // XXX + node state
readyToStart = m.partTab.OperationalWith(m.nodeTab) // XXX + node state
// XXX handle case of new cluster - when no storage reports valid parttab
// XXX -> create new parttab
......@@ -438,23 +441,7 @@ func storCtlRecovery(ctx context.Context, stor *neo.Node, res chan storRecovery)
}
// reconstruct partition table from response
pt := neo.PartitionTable{}
pt.PTid = resp.PTid
for _, row := range resp.RowList {
i := row.Offset
for i >= uint32(len(pt.PtTab)) {
pt.PtTab = append(pt.PtTab, []neo.PartitionCell{})
}
//pt.PtTab[i] = append(pt.PtTab[i], row.CellList...)
for _, cell := range row.CellList {
pt.PtTab[i] = append(pt.PtTab[i], neo.PartitionCell{
NodeUUID: cell.NodeUUID,
CellState: cell.CellState,
})
}
}
pt := neo.PartTabFromDump(resp.PTid, resp.RowList)
res <- storRecovery{stor: stor, partTab: pt}
}
......@@ -526,7 +513,7 @@ loop:
m.nodeTab.SetNodeState(n.node, neo.DOWN)
// if cluster became non-operational - we cancel verification
if !m.partTab.OperationalWith(&m.nodeTab) {
if !m.partTab.OperationalWith(m.nodeTab) {
// XXX ok to instantly cancel? or better
// graceful shutdown in-flight verifications?
vcancel()
......@@ -547,7 +534,7 @@ loop:
// check partTab is still operational
// if not -> cancel to go back to recovery
if !m.partTab.OperationalWith(&m.nodeTab) {
if !m.partTab.OperationalWith(m.nodeTab) {
vcancel()
err = errClusterDegraded
break loop
......@@ -778,7 +765,7 @@ loop:
m.nodeTab.SetNodeState(n.node, neo.DOWN)
// if cluster became non-operational - cancel service
if !m.partTab.OperationalWith(&m.nodeTab) {
if !m.partTab.OperationalWith(m.nodeTab) {
err = errClusterDegraded
break loop
}
......
......@@ -315,12 +315,15 @@ func (stor *Storage) m1initialize(ctx context.Context, Mconn *neo.Conn) (err err
case *neo.Recovery:
err = Mconn.Send(&neo.AnswerRecovery{
PTid: 0, // XXX stub
PTid: stor.node.PartTab.PTid,
BackupTid: neo.INVALID_TID,
TruncateTid: neo.INVALID_TID})
case *neo.AskPartitionTable:
// TODO read and send M locally-saved PT (ptid, []PtRow)
// TODO initially read PT from disk
err = Mconn.Send(&neo.AnswerPartitionTable{
PTid: stor.node.PartTab.PTid,
RowList: stor.node.PartTab.Dump()})
case *neo.LockedTransactions:
// XXX r/o stub
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment