Commit 464169c1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent f1f995cc
...@@ -180,12 +180,13 @@ type ΔBtail struct { ...@@ -180,12 +180,13 @@ type ΔBtail struct {
// trackIdx describes @head state // trackIdx describes @head state
trackIdx trackIndex trackIdx trackIndex
// tree(s) subset that was requested to be tracked, but for which vδB was not yet rebuilt
trackNew trackIndex
// XXX root -> tracked holes // XXX root -> tracked holes
// XXX move -> ΔTtail ? // XXX move -> ΔTtail ?
holeIdxByRoot map[zodb.Oid]treeSetKey holeIdxByRoot map[zodb.Oid]treeSetKey
// // tracked objects that are not yet taken into account in current δBtail
// trackNew SetOid
} }
...@@ -265,6 +266,91 @@ func (tidx trackIndex) gc1(oid zodb.Oid) { ...@@ -265,6 +266,91 @@ func (tidx trackIndex) gc1(oid zodb.Oid) {
} }
} }
// verify verifies internal consistency of tidx.
func (tidx trackIndex) verify() {
// XXX !debug -> return
var badv []string
badf := func(format string, argv ...interface{}) {
badv = append(badv, fmt.Sprintf(format, argv...))
}
defer func() {
if badv != nil {
emsg := fmt.Sprintf("tidx.verify: fail:\n\n")
for _, bad := range badv {
emsg += fmt.Sprintf("- %s\n", bad)
}
emsg += fmt.Sprintf("\ntidx: %s\n", tidx)
panic(emsg)
}
}()
// recompute {} oid -> children and verify .nchild against it
children := map[zodb.Oid]SetOid{}
for oid, t := range tidx {
if t.parent != zodb.InvalidOid {
cc, ok := children[t.parent]
if !ok {
cc = SetOid{}
children[t.parent] = cc
}
cc.Add(oid)
}
}
for oid, t := range tidx {
cc := children[oid]
if t.nchild != len(cc) {
badf("[%s].nchild=%d children: %s", oid, t.nchild, cc)
}
}
// verify that all pointed-to parents are present in tidx
for oid := range children {
_, ok := tidx[oid]
if !ok {
badf("oid %s is pointed to via some .parent, but is not present in the set", oid)
}
}
}
// Update tidx with trees subsets from tidx2.
func (tidx trackIndex) Update(tidx2 trackIndex) {
//fmt.Printf("\n\nUpdate:\n")
//fmt.Printf("tidx: %s\n", tidx)
//fmt.Printf("tidx2: %s\n", tidx2)
tidx.verify()
tidx2.verify()
for oid, t2 := range tidx2 {
t, already := tidx[oid]
if !already {
t = &nodeTrack{parent: t2.parent, nchild: t2.nchild}
tidx[oid] = t
} else {
if t2.parent != t.parent {
// XXX or verify this at Track time and require
// that update is passed only entries with the
// same .parent? (then it would be ok to panic here)
// XXX -> error (e.g. due to corrupt data in ZODB)
panicf("node %s is reachable from multiple parents: %s %s",
oid, t.parent, t2.parent)
}
t.nchild += t2.nchild
}
/*
if t.parent != zodb.InvalidOid {
δnchild[t.parent]
}
*/
}
tidx.verify()
}
// ApplyΔ applies δ to trackIdx. XXX // ApplyΔ applies δ to trackIdx. XXX
func (tidx trackIndex) ApplyΔ(δ *δtrackIndex) { func (tidx trackIndex) ApplyΔ(δ *δtrackIndex) {
//fmt.Printf("\n\nApplyΔ\n") //fmt.Printf("\n\nApplyΔ\n")
...@@ -273,6 +359,8 @@ func (tidx trackIndex) ApplyΔ(δ *δtrackIndex) { ...@@ -273,6 +359,8 @@ func (tidx trackIndex) ApplyΔ(δ *δtrackIndex) {
//fmt.Printf("\tAdd: %v\n", δ.Add) //fmt.Printf("\tAdd: %v\n", δ.Add)
//defer fmt.Printf("\n\t-> tidx: %v\n", tidx) //defer fmt.Printf("\n\t-> tidx: %v\n", tidx)
tidx.verify()
δnchild := map[zodb.Oid]int{} δnchild := map[zodb.Oid]int{}
// remove leafs and thier parents // remove leafs and thier parents
...@@ -320,6 +408,8 @@ func (tidx trackIndex) ApplyΔ(δ *δtrackIndex) { ...@@ -320,6 +408,8 @@ func (tidx trackIndex) ApplyΔ(δ *δtrackIndex) {
for _, oid := range gcq { for _, oid := range gcq {
tidx.gc1(oid) tidx.gc1(oid)
} }
tidx.verify()
} }
// treeSetKey represents ordered set of keys. // treeSetKey represents ordered set of keys.
...@@ -353,6 +443,7 @@ func NewΔBtail(at0 zodb.Tid, db *zodb.DB) *ΔBtail { ...@@ -353,6 +443,7 @@ func NewΔBtail(at0 zodb.Tid, db *zodb.DB) *ΔBtail {
δZtail: zodb.NewΔTail(at0), δZtail: zodb.NewΔTail(at0),
byRoot: map[zodb.Oid]*ΔTtail{}, byRoot: map[zodb.Oid]*ΔTtail{},
trackIdx: trackIndex{}, trackIdx: trackIndex{},
trackNew: trackIndex{},
holeIdxByRoot: map[zodb.Oid]treeSetKey{}, holeIdxByRoot: map[zodb.Oid]treeSetKey{},
db: db, db: db,
} }
...@@ -395,7 +486,8 @@ func (δBtail *ΔBtail) Track(key Key, keyPresent bool, path []Node) error { // ...@@ -395,7 +486,8 @@ func (δBtail *ΔBtail) Track(key Key, keyPresent bool, path []Node) error { //
for _, node := range path { pathv = append(pathv, vnode(node)) } for _, node := range path { pathv = append(pathv, vnode(node)) }
tracef("Track [%v] %s\n", key, strings.Join(pathv, " -> ")) tracef("Track [%v] %s\n", key, strings.Join(pathv, " -> "))
δBtail.trackIdx.AddNodePath(path) // δBtail.trackIdx.AddNodePath(path)
δBtail.trackNew.AddNodePath(path)
// track is track of path[-1] (i.e. leaf) // track is track of path[-1] (i.e. leaf)
...@@ -498,7 +590,12 @@ func (tidx trackIndex) AddPath(path []zodb.Oid) { ...@@ -498,7 +590,12 @@ func (tidx trackIndex) AddPath(path []zodb.Oid) {
// rebuild rebuilds ΔBtree taking trackNew requests into account. XXX // rebuild rebuilds ΔBtree taking trackNew requests into account. XXX
// XXX place // XXX place
func (δBtail *ΔBtail) rebuild() { func (δBtail *ΔBtail) rebuild() {
// XXX stub
// merge .trackNew into .trackIdx
δBtail.trackIdx.Update(δBtail.trackNew)
δBtail.trackNew = trackIndex{}
return return
panic("TODO") panic("TODO")
} }
...@@ -515,6 +612,9 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) { ...@@ -515,6 +612,9 @@ func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
headOld := δBtail.Head() headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail update %s -> %s", headOld, δZ.Tid) defer xerr.Contextf(&err, "ΔBtail update %s -> %s", headOld, δZ.Tid)
// update .trackIdx and vδB from .trackNew
δBtail.rebuild()
δBtail.δZtail.Append(δZ.Tid, δZ.Changev) δBtail.δZtail.Append(δZ.Tid, δZ.Changev)
tracef("Update δZ: %v\n", δZ.Changev) tracef("Update δZ: %v\n", δZ.Changev)
......
...@@ -704,9 +704,10 @@ func xverifyΔBTail_Update1(t *testing.T, subj string, db *zodb.DB, treeRoot zod ...@@ -704,9 +704,10 @@ func xverifyΔBTail_Update1(t *testing.T, subj string, db *zodb.DB, treeRoot zod
// verify δbtail.trackIdx against @at1 // verify δbtail.trackIdx against @at1
// trackIdx1 = xkv1[tracked1] // trackIdx1 = xkv1[tracked1]
trackIdx1 := xkv1.trackIdx(initialTrackedKeys) trackIdx1 := xkv1.trackIdx(initialTrackedKeys)
if !reflect.DeepEqual(trackIdx1, δbtail.trackIdx) { // if !reflect.DeepEqual(trackIdx1, δbtail.trackIdx) {
badf("δbtail.trackIdx1 wrong:\n\thave: %v\n\twant: %v", δbtail.trackIdx, trackIdx1) // badf("δbtail.trackIdx1 wrong:\n\thave: %v\n\twant: %v", δbtail.trackIdx, trackIdx1)
} // }
δbtail.assertTrack(t, "1", /*ø*/trackIndex{}, trackIdx1)
// δB <- δZ // δB <- δZ
...@@ -798,10 +799,11 @@ func assertTrack(t *testing.T, subj string, trackIdx, trackIdxOK trackIndex) { ...@@ -798,10 +799,11 @@ func assertTrack(t *testing.T, subj string, trackIdx, trackIdxOK trackIndex) {
// assertTrack verifies state of .trackIdx and .newTrackIdx. // assertTrack verifies state of .trackIdx and .newTrackIdx.
// XXX place // XXX place
func (δbtail *ΔBtail) assertTrack(t *testing.T, subj string, trackIdxOK, newTrackIdxOK trackIndex) { func (δbtail *ΔBtail) assertTrack(t *testing.T, subj string, trackIdxOK, trackNewOK trackIndex) {
assertTrack(t, subj + ": trackIdx", δbtail.trackIdx, trackIdxOK) assertTrack(t, subj + ": trackIdx", δbtail.trackIdx, trackIdxOK)
// XXX reenable assertTrack(t, subj + ": trackNew", δbtail.trackNew, trackNewOK)
// assertTrack(t, subj + ": newTrackIdx", δbtail.newTrackIdx, newTrackIdxOK)
// XXX also verify .holeIdx XXX or better get rid of .holeIdx
} }
// xverifyΔBTail_rebuild verifies δBtail.rebuild during t0->t1->t2 transition. // xverifyΔBTail_rebuild verifies δBtail.rebuild during t0->t1->t2 transition.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment