Commit ed777fda authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent d2ac0ad1
......@@ -574,7 +574,10 @@ func (m *Master) recovery(ctx context.Context) (err error) {
storv = append(storv, stor)
}
}
m.node.State.PartTab = xneo.MakePartTab(1 /* XXX hardcoded */, storv)
m.node.State.PartTab = xneo.MakePartTab(
1 /* np; XXX hardcoded */,
1 /*R; XXX hardcoded*/,
storv)
m.node.State.PartTab.PTid = 1
log.Infof(ctx, "creating new partition table: %s", m.node.State.PartTab)
}
......@@ -603,7 +606,15 @@ func storCtlRecovery(ctx context.Context, stor *_MasteredPeer) (_ *xneo.Partitio
}
// reconstruct partition table from response
pt := xneo.PartTabFromDump(resp.PTid, resp.RowList) // TODO handle resp.NumReplicas
// XXX AnswerPartitionTable and SendPartitionTable is exactly the same
// TODO -> better it be only 1 message
mpt := &proto.SendPartitionTable{
PTid: resp.PTid,
NumReplicas: resp.NumReplicas,
RowList: resp.RowList,
}
// NOTE NEO/py uses .NumReplicas as n(replica) - 1
pt := xneo.PartTabFromMsg(mpt)
return pt, nil
}
......@@ -789,11 +800,7 @@ func storCtlVerify(ctx context.Context, stor *_MasteredPeer, pt *xneo.PartitionT
lastTid = zodb.InvalidTid
// send just recovered parttab so storage saves it
err = slink.Send1(&proto.SendPartitionTable{
PTid: pt.PTid,
NumReplicas: 0, // FIXME hardcoded
RowList: pt.Dump(),
})
err = slink.Send1(pt.ToMsg())
if err != nil {
return
}
......
......@@ -180,7 +180,7 @@ func (node *_MasteredNode) talkMaster1(ctx, ctxPreTalkM context.Context, f func(
if err != nil {
return fmt.Errorf("after identification: expect partTab: %w", err)
}
pt = xneo.PartTabFromDump(mpt.PTid, mpt.RowList) // TODO handle mpt.NumReplicas
pt = xneo.PartTabFromMsg(&mpt)
log.Infof(ctx, "<- parttab:\n%s", pt)
}
......@@ -269,7 +269,7 @@ func (node *_MasteredNode) recvδstate(ctx context.Context, msg proto.Msg) (δpt
// <- whole partTab
case *proto.SendPartitionTable:
δpt = true
pt := xneo.PartTabFromDump(msg.PTid, msg.RowList) // FIXME handle msg.NumReplicas
pt := xneo.PartTabFromMsg(msg)
log.Infof(ctx, "<- parttab: %s", pt)
node.State.PartTab = pt
......
......@@ -168,11 +168,12 @@ func (stor *Storage) m1initialize1(ctx context.Context, req neonet.Request) erro
TruncateTid: proto.INVALID_TID})
case *proto.AskPartitionTable:
// TODO initially read PT from disk
pt := stor.node.State.PartTab // TODO initially read PT from disk
mpt := pt.ToMsg() // TODO merge AnswerPartitionTable and SendPartitionTable
err = req.Reply(&proto.AnswerPartitionTable{
PTid: stor.node.State.PartTab.PTid,
NumReplicas: 0, // FIXME hardcoded; NEO/py uses this as n(replica)-1
RowList: stor.node.State.PartTab.Dump()})
PTid: mpt.PTid,
NumReplicas: mpt.NumReplicas,
RowList: mpt.RowList})
case *proto.LockedTransactions:
// FIXME r/o stub
......
......@@ -251,19 +251,6 @@ func (p *PeerNode) dial(ctx context.Context) (_ *neonet.NodeLink, err error) {
case accept.YourNID != node.MyInfo.NID:
err = fmt.Errorf("connected, but peer gives us nid %s (our is %s)", accept.YourNID, node.MyInfo.NID)
// XXX PeerNode.Dial is currently used by Client only.
// XXX For Client it would be not correct to check #partition only at
// XXX connection time, but it has to be also checked after always as every
// XXX operation could coincide with cluster reconfiguration.
//
// FIXME for now we simply don't check N(p)
//
// XXX NumReplicas: neo/py meaning for n(replica) = `n(real-replica) - 1`
/*
case !(accept.NumPartitions == 1 && accept.NumReplicas == 0):
err = fmt.Errorf("connected but TODO peer works with !1x1 partition table.")
*/
}
if err != nil {
......
......@@ -117,6 +117,7 @@ import (
// PartitionTable zero value is valid empty partition table.
type PartitionTable struct {
PTid proto.PTid // ↑ for versioning
r_ int // n(replica) - 1; >= 0
tab [][]Cell // [#Np] pid -> []Cell
}
......@@ -135,6 +136,10 @@ type Cell struct {
// data from here and here it will be removed)
}
// R returns n(replica) for partition table.
func (pt *PartitionTable) R() int { return pt.r_ + 1 }
// Get returns cells oid is associated with.
func (pt *PartitionTable) Get(oid zodb.Oid) []Cell {
if len(pt.tab) == 0 {
......@@ -153,25 +158,27 @@ func (c *Cell) Readable() bool {
return false
}
// MakePartTab creates new partition with uniformly distributed nodes
// MakePartTab creates new partition with uniformly distributed nodes.
// The partition table created will be of len=np
// FIXME R=1 hardcoded
// XXX nodev -> []NodeInfo ?
func MakePartTab(np int, nodev []*PeerNode) *PartitionTable {
// XXX stub, not tested
func MakePartTab(np int, nreplica int, nodev []*PeerNode) *PartitionTable {
if nreplica < 1 {
panic("nreplica too small")
}
// TODO tests
// TODO take nreplica into account
tab := make([][]Cell, np)
for i, j := 0, 0; i < np; i, j = i+1, j+1 % len(nodev) {
node := nodev[j]
// XXX assert node.State > DOWN
//fmt.Printf("tab[%d] <- %v\n", i, node.NID)
tab[i] = []Cell{{CellInfo: proto.CellInfo{node.NID, proto.UP_TO_DATE /*XXX ok?*/}}}
}
return &PartitionTable{tab: tab}
return &PartitionTable{PTid: 0, r_: nreplica-1, tab: tab}
}
// OperationalWith checks whether all object space is covered by at least some ready-to-serve nodes
// OperationalWith checks whether all object space is covered by at least some ready-to-serve nodes.
//
// for all partitions it checks both:
// - whether there are up-to-date entries in the partition table, and
......@@ -200,7 +207,7 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
//
// We leave it as is for now.
node := nt.Get(cell.NID)
if node == nil || node.State != proto.RUNNING { // XXX PENDING is also ok ?
if !(node != nil && node.State == proto.RUNNING) {
continue
}
......@@ -211,7 +218,6 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
if !ok {
return false
}
}
return true
......@@ -219,12 +225,11 @@ func (pt *PartitionTable) OperationalWith(nt *NodeTable) bool {
// ---- encode / decode PT to / from messages
// XXX naming
func (pt *PartitionTable) String() string {
buf := &bytes.Buffer{}
fmt.Fprintf(buf, "PT.%d[%d]\n", pt.PTid, len(pt.tab))
fmt.Fprintf(buf, "PT.%d/%d[%d]\n", pt.PTid, pt.R(), len(pt.tab))
for i, cellv := range pt.tab {
fmt.Fprintf(buf, "%d:\t", i)
if len(cellv) == 0 {
......@@ -241,8 +246,13 @@ func (pt *PartitionTable) String() string {
return buf.String()
}
// XXX -> RowList() ?
func (pt *PartitionTable) Dump() []proto.RowInfo { // XXX also include .ptid? -> struct ?
// ToMsg creates message that represents parttion table state.
func (pt *PartitionTable) ToMsg() *proto.SendPartitionTable {
msg := &proto.SendPartitionTable{
PTid: pt.PTid,
NumReplicas: uint32(pt.R() - 1), // NOTE NEO/py treats NumReplicas as n(replica)-1
}
rowv := make([]proto.RowInfo, len(pt.tab))
for i, row := range pt.tab {
cellv := make([]proto.CellInfo, len(row))
......@@ -252,16 +262,19 @@ func (pt *PartitionTable) Dump() []proto.RowInfo { // XXX also include .ptid? ->
rowv[i] = proto.RowInfo{CellList: cellv}
}
return rowv
msg.RowList = rowv
return msg
}
// XXX +nreplica
func PartTabFromDump(ptid proto.PTid, rowv []proto.RowInfo) *PartitionTable {
// reconstruct partition table from response
pt := &PartitionTable{}
pt.PTid = ptid
// PartTabFromMsg creates partition table with state obtained from message.
func PartTabFromMsg(msg *proto.SendPartitionTable) *PartitionTable {
pt := &PartitionTable{
PTid: msg.PTid,
r_: int(msg.NumReplicas + 1 - 1), // NOTE NEO/py treates NumReplicas as n(replica)-1
}
for i, row := range rowv {
for i, row := range msg.RowList {
for i >= len(pt.tab) {
pt.tab = append(pt.tab, []Cell{})
}
......
......@@ -61,9 +61,7 @@ func (cs *ClusterState) Snapshot() *ClusterStateSnapshot {
S.NodeTab.IdTime = proto.IdTimeNone // XXX what here?
S.NodeTab.NodeList = nodeiv
S.PartTab.PTid = cs.PartTab.PTid
S.PartTab.NumReplicas = uint32(0) // FIXME hardcoded NumReplicas; NEO/py keeps this as n(replica)-1
S.PartTab.RowList = cs.PartTab.Dump()
S.PartTab = *cs.PartTab.ToMsg()
return S
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment