Commit 05c78d30 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 6e933964
...@@ -135,7 +135,7 @@ type ΔValue struct { ...@@ -135,7 +135,7 @@ type ΔValue struct {
// //
// It covers only changes to keys from tracked subset of BTrees parts. // It covers only changes to keys from tracked subset of BTrees parts.
// In particular a key that was not explicitly requested to be tracked, even if // In particular a key that was not explicitly requested to be tracked, even if
// it was changed in δZ, is not guaranted to be present in δB. // it was changed in δZ, is not guaranteed to be present in δB.
// //
// ΔBtail provides the following operations: // ΔBtail provides the following operations:
// //
...@@ -164,8 +164,8 @@ type ΔBtail struct { ...@@ -164,8 +164,8 @@ type ΔBtail struct {
δZtail *zodb.ΔTail δZtail *zodb.ΔTail
// XXX vvv keys ∈ tracked -> keys ∈ kadj[tracked] ? // XXX vvv keys ∈ tracked -> keys ∈ kadj[tracked] ?
// δRtail []ΔRoots // which BTree were changed; Noted only by keys ∈ tracket subset // δRtail []ΔRoots // which BTree were changed; Noted only by keys ∈ tracked subset
byRoot map[zodb.Oid]*ΔTtail // {} root -> [] k/v change history; only for keys ∈ tracket subset byRoot map[zodb.Oid]*ΔTtail // {} root -> [] k/v change history; only for keys ∈ tracked subset
// handle to make connections to access database. // handle to make connections to access database.
// TODO allow client to optionally provide zconnOld/zconnNew on e.g. Update() // TODO allow client to optionally provide zconnOld/zconnNew on e.g. Update()
...@@ -312,7 +312,12 @@ func (δBtail *ΔBtail) Track(key Key, keyPresent bool, path []Node) error { // ...@@ -312,7 +312,12 @@ func (δBtail *ΔBtail) Track(key Key, keyPresent bool, path []Node) error { //
var oldTrack bool var oldTrack bool
for _, node := range path { for _, node := range path {
oid := node.POid() oid := node.POid()
// XXX check for InvalidOid (e.g. T/B1:a with bucket not having its own oid. // InvalidOid means embedded bucket - e.g. T/B1:a with bucket not having its own oid.
//if oid == zodb.InvalidOid {
// // XXX verify node is leaf; else panic
// continue
//}
track, oldTrack = δBtail.trackIdx[oid] track, oldTrack = δBtail.trackIdx[oid]
if !oldTrack { if !oldTrack {
track = nodeTrack{parent: parent} track = nodeTrack{parent: parent}
...@@ -484,6 +489,7 @@ func (δBtail *ΔBtail) δZConnectTracked(δZv *zodb.EventCommit) (δZTC SetOid, ...@@ -484,6 +489,7 @@ func (δBtail *ΔBtail) δZConnectTracked(δZv *zodb.EventCommit) (δZTC SetOid,
// XXX place // XXX place
// nodeInRange represents a Node coming under [lo, hi_] key range in its tree. // nodeInRange represents a Node coming under [lo, hi_] key range in its tree.
type nodeInRange struct { type nodeInRange struct {
// XXX +parent?
lo, hi_ Key // [lo, hi_] NOTE _not_ hi) not to overflow at ∞ lo, hi_ Key // [lo, hi_] NOTE _not_ hi) not to overflow at ∞
node Node node Node
done bool // whether this node was already taken into account while computing diff done bool // whether this node was already taken into account while computing diff
...@@ -783,7 +789,7 @@ func diffT(ctx context.Context, A, B *Tree, δZTC SetOid, trackIdx map[zodb.Oid] ...@@ -783,7 +789,7 @@ func diffT(ctx context.Context, A, B *Tree, δZTC SetOid, trackIdx map[zodb.Oid]
// a node ac does not contribute to δ- and can be skipped, if: // a node ac does not contribute to δ- and can be skipped, if:
// - ac is not tracked, or // - ac is not tracked, or
// - ac ∉ δZTC && ∃ bc from B: ac.oid == bc.oid (ac was not changed and stays in the tree) // - ac ∉ δZTC && ∃ bc from B: ac.oid == bc.oid (ac was not changed and stays in the tree)
Aq := []*nodeInRange{atop} // queue for A nodes that contribyte to δ- Aq := []*nodeInRange{atop} // queue for A nodes that contribute to δ-
for len(Aq) > 0 { for len(Aq) > 0 {
tracef("\n") tracef("\n")
tracef(" aq: %v\n", Aq) tracef(" aq: %v\n", Aq)
......
...@@ -271,7 +271,7 @@ func (tg *AllStructsSrv) AllStructs(kv1, kv2 map[Key]string, maxdepth, maxsplit, ...@@ -271,7 +271,7 @@ func (tg *AllStructsSrv) AllStructs(kv1, kv2 map[Key]string, maxdepth, maxsplit,
// RTree represents Tree node covering [lo, hi_] key range in its parent tree. // RTree represents Tree node covering [lo, hi_] key range in its parent tree.
type RTree struct { type RTree struct {
oid Oid oid zodb.Oid
parent *RTree parent *RTree
// XXX +children? // XXX +children?
} }
...@@ -279,7 +279,7 @@ type RTree struct { ...@@ -279,7 +279,7 @@ type RTree struct {
// RBucket represents Bucket node covering [lo, hi_] key range in its Tree. // RBucket represents Bucket node covering [lo, hi_] key range in its Tree.
// NOTE it is not [lo,hi) but [lo,hi_] instead to avoid overflow at KeyMax. // NOTE it is not [lo,hi) but [lo,hi_] instead to avoid overflow at KeyMax.
type RBucket struct { type RBucket struct {
oid Oid oid zodb.Oid
parent *RTree parent *RTree
lo, hi_ Key lo, hi_ Key
kv map[Key]string // bucket's k->v; values were ZBlk objects whose data is loaded instead. kv map[Key]string // bucket's k->v; values were ZBlk objects whose data is loaded instead.
...@@ -321,6 +321,59 @@ func (rbs RBucketSet) coverage() string { ...@@ -321,6 +321,59 @@ func (rbs RBucketSet) coverage() string {
return s return s
} }
// holeIdx returns what should be ΔBtree.holeIdx for specified tracked key set.
func (rbs RBucketSet) holeIdx(tracked SetKey) SetKey {
holes := SetKey{}
for k := range tracked {
_, keyin := rbs.Get(k).kv[k]
if !keyin {
holes.Add(k)
}
}
return holes
}
// trackIdx returns what should be ΔBtree.trackIdx for specified tracked key set.
func (rbs RBucketSet) trackIdx(tracked SetKey) map[zodb.Oid]nodeTrack {
trackIdx := map[zodb.Oid]nodeTrack{}
for k := range tracked {
kb := rbs.Get(k)
// trackIdx records regular buckets or non-empty embedded bucket
// (empty embedded bucket means there is just empty tree node
// and only that is recorded in trackIdx)
if kb.oid != zodb.InvalidOid || len(kb.kv) != 0 {
track, already := trackIdx[kb.oid]
if !already {
track = nodeTrack{parent: kb.parent.oid}
trackIdx[kb.oid] = track
}
if track.parent != kb.parent.oid {
panicf("BUG: %s: B%s -> multiple parents: %s %s", rbs.coverage(), kb.oid, track.parent, kb.parent.oid)
}
}
p := kb.parent
for p != nil {
ppoid := zodb.InvalidOid // oid of p.parent
if p.parent != nil {
ppoid = p.parent.oid
}
pt, already := trackIdx[p.oid]
if !already {
pt = nodeTrack{parent: ppoid}
trackIdx[p.oid] = pt
}
if pt.parent != ppoid {
panicf("BUG: %s: T%s -> multiple parents: %s %s", rbs.coverage(), p.oid, pt.parent, ppoid)
}
p = p.parent
}
}
return trackIdx
}
// XGetTree loads Tree from zurl@at->obj<root>. // XGetTree loads Tree from zurl@at->obj<root>.
...@@ -348,7 +401,7 @@ func XGetTree(db *zodb.DB, at zodb.Tid, root zodb.Oid) RBucketSet { ...@@ -348,7 +401,7 @@ func XGetTree(db *zodb.DB, at zodb.Tid, root zodb.Oid) RBucketSet {
}) })
if len(rbucketv) == 0 { // empty tree -> [-∞, ∞){} if len(rbucketv) == 0 { // empty tree -> [-∞, ∞){}
etree := &RTree{ etree := &RTree{
oid: zodb.InvalidOid, oid: root,
parent: nil, parent: nil,
} }
ebucket := &RBucket{ ebucket := &RBucket{
...@@ -386,7 +439,7 @@ func _xwalkDFS(ctx context.Context, lo, hi_ Key, ztree *Tree, rparent *RTree, bv ...@@ -386,7 +439,7 @@ func _xwalkDFS(ctx context.Context, lo, hi_ Key, ztree *Tree, rparent *RTree, bv
tchild, ok := ev[i].Child().(*Tree) tchild, ok := ev[i].Child().(*Tree)
if ok { if ok {
_xwalkDFS(ctx, xlo, xhi_, tchild, rparent, bvisit) _xwalkDFS(ctx, xlo, xhi_, tchild, rtree, bvisit)
continue continue
} }
...@@ -616,19 +669,17 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid, ...@@ -616,19 +669,17 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid,
// verify δbtail.holeIdx against @at1 // verify δbtail.holeIdx against @at1
// holes1 = tracked1 \ kv1 // holes1 = tracked1 \ kv1
holes1 := SetKey{} holes1 := xkv1.holeIdx(initialTrackedKeys)
for k := range initialTrackedKeys {
_, keyin1 := xkv1.Get(k).kv[k]
if !keyin1 {
holes1.Add(k)
}
}
if !reflect.DeepEqual(holes1, δbtail.holeIdx.SetKey) { if !reflect.DeepEqual(holes1, δbtail.holeIdx.SetKey) {
badf("δbtail.holeIdx1 wrong ; holeIdx=%v holeIdxOK=%v", δbtail.holeIdx, holes1) badf("δbtail.holeIdx1 wrong ; holeIdx=%v holeIdxOK=%v", δbtail.holeIdx, holes1)
} }
// verify δbtail.trackIdx against @at1 // verify δbtail.trackIdx against @at1
// tracked1 = ... XXX // trackIdx1 = xkv1[tracked1]
trackIdx1 := xkv1.trackIdx(initialTrackedKeys)
if !reflect.DeepEqual(trackIdx1, δbtail.trackIdx) {
badf("δbtail.trackIdx1 wrong ; trackIdx=%v trackIdxOK=%v", δbtail.trackIdx, trackIdx1)
}
// δB <- δZ // δB <- δZ
...@@ -640,20 +691,19 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid, ...@@ -640,20 +691,19 @@ func xverifyΔBTail1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid,
// verify δbtail.holeIdx against @at2 // verify δbtail.holeIdx against @at2
// holes2 = tracked2 \ kv2 ( = kadj[tracked1] \ kv2) // holes2 = tracked2 \ kv2 ( = kadj[tracked1] \ kv2)
holes2 := SetKey{} holes2 := xkv2.holeIdx(kadjTracked)
for k := range kadjTracked {
_, keyin2 := xkv2.Get(k).kv[k]
if !keyin2 {
holes2.Add(k)
}
}
if !reflect.DeepEqual(holes2, δbtail.holeIdx.SetKey) { if !reflect.DeepEqual(holes2, δbtail.holeIdx.SetKey) {
badf("δbtail.holeIdx2 wrong ; holeIdx=%v holeIdxOK=%v", δbtail.holeIdx, holes2) badf("δbtail.holeIdx2 wrong ; holeIdx=%v holeIdxOK=%v", δbtail.holeIdx, holes2)
} }
// XXX verify δbtail index consistency against @at2 // verify δbtail.traciIdx against @at2
// XXX verify that removed keys remain in trackedIdx (holes), so that e.g. // trackIdx2 = xkv2[tracked2] ( = xkv2[kadj[tracked1]]
// track [k]; k:a->b; k:b->ø; k:ø->c is not missed /* XXX reenable
trackIdx2 := xkv2.trackIdx(kadjTracked)
if !reflect.DeepEqual(trackIdx2, δbtail.trackIdx) {
badf("δbtail.trackIdx2 wrong ; trackIdx=%v trackIdxOK=%v", δbtail.trackIdx, trackIdx2)
}
*/
// assert δB.ByRoot == {treeRoot -> ...} if δTok != ø // assert δB.ByRoot == {treeRoot -> ...} if δTok != ø
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment