// Copyright (C) 2018-2021 Nexedi SA and Contributors. // Kirill Smelkov // // This program is free software: you can Use, Study, Modify and Redistribute // it under the terms of the GNU General Public License version 3, or (at your // option) any later version, as published by the Free Software Foundation. // // You can also Link and Combine this program with other software covered by // the terms of any of the Free Software licenses or any of the Open Source // Initiative approved licenses and Convey the resulting work. Corresponding // source of such a combination shall include the source code for all other // software used. // // This program is distributed WITHOUT ANY WARRANTY; without even the implied // warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. // // See COPYING file for full licensing terms. // See https://www.nexedi.com/licensing for rationale and options. package main // XXX -> Package xbtree complements package lab.nexedi.com/kirr/neo/go/zodb/btree. // TODO move -> btree when ΔTail matures. // XXX doc import ( "context" "sort" "strings" "lab.nexedi.com/kirr/go123/xerr" "lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/zodb" ) // ΔBtail represents tail of revisional changes to BTrees. // // It semantically consists of // // []δB ; rev ∈ (tail, head] // atTail XXX no need (see vvv) // // where δB represents a change in BTrees space // // δB: // .rev↑ // {} root -> {}(key, δvalue) XXX was value // // and atTail keeps set of k/v @tail for keys changed in (tail, head] // // atTail: XXX no need for atTail as we have δvalue.Old // {} root -> {}(key, value) // // It covers only changes to keys from tracked subset of BTrees parts. // In particular a key that was not explicitly requested to be tracked, even if // it was changed in δZ, is not guaranteed to be present in δB. // // ΔBtail provides the following operations: // // .Track(path) - start tracking tree nodes and keys; root=path[0], keys=path[-1].keys XXX keys not correct - e.g. track missing key // // .Update(δZ) -> δB - update BTree δ tail given raw ZODB changes // .ForgetPast(revCut) - forget changes past revCut // .SliceByRev(lo, hi) -> []δB - XXX // .SliceByRootRev(root, lo, hi) -> []δT - XXX // // XXX also: // .Get(root, key, at) - get root[key] @at assuming root[key] ∈ tracked // // An example for tracked set is a set of visited BTree paths. // There is no requirement that tracked set belongs to only one single BTree. // // XXX δB can convert δZ to btree changes, but only at least for δZ's objects // that ∈ BTree subgraphs that were explicitly requested to be tracked by δB. // // XXX incremental; not full coverage // // ΔBtail is not safe for concurrent access. // XXX -> multiple readers / single writer? // // See also zodb.ΔTail // XXX naming -> ΔBTail ? type ΔBtail struct { // raw ZODB changes; Kept to rebuild δBtail/byRoot after new Track. // includes all changed objects, not only tracked ones. δZtail *zodb.ΔTail // XXX vvv keys ∈ tracked -> keys ∈ kadj[tracked] ? // vδB []ΔB // data with δB changes; Noted only by keys ∈ tracked subset byRoot map[zodb.Oid]*ΔTtail // {} root -> [] k/v change history; only for keys ∈ tracked subset // handle to make connections to access database. // TODO allow client to optionally provide zconnOld/zconnNew on e.g. Update() db *zodb.DB // to open connections to load new/old tree|buckets // set of tracked nodes as of @head state. trackSet PPTreeSubSet // set of node that were requested to be tracked, but for which vδB was not yet rebuilt trackNew PPTreeSubSet // XXX root -> tracked holes // XXX move -> ΔTtail ? holeIdxByRoot map[zodb.Oid]treeSetKey } // ΔB represents a change in BTrees space. type ΔB struct { Rev zodb.Tid ByRoot map[zodb.Oid]map[Key]ΔValue // {} root -> {}(key, δvalue) } // ΔTtail represent tail of revisional changes to one BTree. // // See ΔBtail documentation for details. type ΔTtail struct { vδT []ΔTree // changes to tree keys; rev↑. covers keys ∈ tracked subset // {}k/v @tail for keys that are changed in (tail, head]. KVAtTail map[Key]Value // XXX not needed since vδT has ΔValue ? // index for LastRevOf queries lastRevOf map[Key]zodb.Tid // {} key -> last } // ΔTree describes changes to one BTree in one revision. // XXX -> ΔT ? type ΔTree struct { Rev zodb.Tid ΔKV map[Key]ΔValue } // NewΔBtail creates new empty ΔBtail object. // // Initial tracked set is empty. // Initial coverage is (at₀, at₀]. // // db will be used by ΔBtail to open database connections to load data from // ZODB when needed. XXX or caller provides zhead/zprev explicitly? func NewΔBtail(at0 zodb.Tid, db *zodb.DB) *ΔBtail { return &ΔBtail{ δZtail: zodb.NewΔTail(at0), byRoot: map[zodb.Oid]*ΔTtail{}, trackSet: PPTreeSubSet{}, trackNew: PPTreeSubSet{}, holeIdxByRoot: map[zodb.Oid]treeSetKey{}, db: db, } } // xverifyΔBTail_rebuild needs ΔBTree.clone because otherwise it is too slow to run that test. func (orig *ΔBtail) clone() *ΔBtail { klon := &ΔBtail{db: orig.db} // δZtail klon.δZtail = zodb.NewΔTail(orig.Tail()) for _, δZ := range orig.δZtail.Data() { klon.δZtail.Append(δZ.Rev, δZ.Changev) } // trackSet, trackNew klon.trackSet = orig.trackSet.Clone() klon.trackNew = orig.trackNew.Clone() // byRoot klon.byRoot = make(map[zodb.Oid]*ΔTtail, len(orig.byRoot)) for root, origΔTtail := range orig.byRoot { klonΔTtail := &ΔTtail{} klonΔTtail.vδT = append(klonΔTtail.vδT, origΔTtail.vδT...) klonΔTtail.KVAtTail = make(map[Key]Value, len(origΔTtail.KVAtTail)) for k, v := range origΔTtail.KVAtTail { klonΔTtail.KVAtTail[k] = v } klonΔTtail.lastRevOf = make(map[Key]zodb.Tid, len(origΔTtail.lastRevOf)) for k, rev := range origΔTtail.lastRevOf { klonΔTtail.lastRevOf[k] = rev } klon.byRoot[root] = klonΔTtail } // holeIdxByRoot klon.holeIdxByRoot = make(map[zodb.Oid]treeSetKey, len(orig.holeIdxByRoot)) for root, origHoleIdx := range orig.holeIdxByRoot { klonHoleIdx := treeSetKey{make(SetKey, len(origHoleIdx.SetKey))} klonHoleIdx.Update(origHoleIdx.SetKey) klon.holeIdxByRoot[root] = klonHoleIdx } return klon } // newΔTtail creates new empty ΔTtail object. func newΔTtail() *ΔTtail { return &ΔTtail{ KVAtTail: make(map[Key]Value), lastRevOf: make(map[Key]zodb.Tid), } } // (tail, head] coverage func (δBtail *ΔBtail) Head() zodb.Tid { return δBtail.δZtail.Head() } func (δBtail *ΔBtail) Tail() zodb.Tid { return δBtail.δZtail.Tail() } // XXX SliceByRev? // XXX ForgetPast // Track adds tree path to tracked set. // // path[0] signifies tree root. // All path elements must be Tree except last one which must be Bucket. // // Besides key (which might point to value or hole), δBtail will also track all // keys residing in tracked leaf nodes. In particular after request for KeyMax or // KeyMin to be tracked, δBtail will keep on tracking changes to maximum or // minimum keys correspondingly. // // XXX objects in path must be with .PJar().At() == .head // XXX path -> []oid ? // // XXX catch cycles on add? func (δBtail *ΔBtail) Track(key Key, keyPresent bool, path []Node) error { // XXX Tree|Bucket; path[0] = root // XXX assert Tree Tree ... Tree Bucket XXX AddNodePath does that root := path[0].(*Tree).POid() pathv := []string{} for _, node := range path { pathv = append(pathv, vnode(node)) } tracef("Track [%v] %s\n", key, strings.Join(pathv, " -> ")) δBtail.trackNew.AddNodePath(path) // track is track of path[-1] (i.e. leaf) // remember missing keys in track of leaf node (bucket or top-level ø tree) holeIdx := δBtail.holeIdxFor(root) if !keyPresent { holeIdx.Add(key) //track.holes.Add(key) } else { /* if track.holes.Has(key) { panicf("[%v] was previously requested to be tracked as ø", key) } */ if holeIdx.Has(key) { panicf("[%v] was previously requested to be tracked as ø", key) } } // XXX update diff XXX here? or as separate step? // XXX update lastRevOf return nil } func (δBtail *ΔBtail) holeIdxFor(root zodb.Oid) treeSetKey { holeIdx, ok := δBtail.holeIdxByRoot[root] if !ok { holeIdx = treeSetKey{SetKey{}} δBtail.holeIdxByRoot[root] = holeIdx } return holeIdx } // rebuild rebuilds ΔBtail taking trackNew requests into account. // XXX place func (δBtail *ΔBtail) rebuild() (err error) { defer xerr.Context(&err, "ΔBtail rebuild") // XXX locking trackNew := δBtail.trackNew δBtail.trackNew = PPTreeSubSet{} if len(trackNew) == 0 { return } // go backwards and merge vδT <- treediff(lo..hi/trackNew) vδZ := δBtail.δZtail.Data() vδtrack := []*ΔPPTreeSubSet{} for i := len(vδZ)-1; i>=0; i-- { δZ := vδZ[i] // XXX dup wrt Update? var atPrev zodb.Tid if i > 0 { atPrev = vδZ[i-1].Rev } else { atPrev = δBtail.δZtail.Tail() } δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, trackNew) tracef("\nrebuild @%s <- @%s\n", atPrev, δZ.Rev) tracef("δZ:\t%v\n", δZ.Changev) tracef("trackNew: %v\n", trackNew) tracef("trackSet: %v\n", δBtail.trackSet) // XXX needed? defer tracef("\n\n") // XXX len(δtopsByRoot) == 0 -> skip // open ZODB connection corresponding to "current" and "prev" states txn, ctx := transaction.New(context.TODO()) // XXX defer txn.Abort() // XXX -> into func() or don't use defer zconnPrev, err := δBtail.db.Open(ctx, &zodb.ConnOptions{At: atPrev}) if err != nil { return err } zconnCurr, err := δBtail.db.Open(ctx, &zodb.ConnOptions{At: δZ.Rev}) if err != nil { return err } for root, δtops := range δtopsByRoot { holeIdx := treeSetKey{SetKey{}} // XXX stub // diff backwards curr -> prev δT, δtrack, err := treediff(ctx, root, δtops, δZTC, trackNew, holeIdx, zconnCurr, zconnPrev) if err != nil { return err } tracef("-> root<%s> δkv*: %v δtrack*: %v\n", root, δT, δtrack) trackNew.ApplyΔ(δtrack) vδtrack = append([]*ΔPPTreeSubSet{δtrack}, vδtrack...) if len(δT) == 0 { // an object might be resaved without change continue } δTtail, ok := δBtail.byRoot[root] if !ok { // this root was not tracked before -> create δTtail for it with empty changes δTtail = newΔTtail() δBtail.byRoot[root] = δTtail } // δTtail.vδT <- merge δT* l := len(δTtail.vδT) j := sort.Search(l, func(k int) bool { return δZ.Rev <= δTtail.vδT[k].Rev }) if j == l || δTtail.vδT[j].Rev != δZ.Rev { δTcurr := ΔTree{Rev: δZ.Rev, ΔKV: map[Key]ΔValue{}} // insert(@j, δTcurr) δTtail.vδT = append(δTtail.vδT[:j], append([]ΔTree{δTcurr}, δTtail.vδT[j:]...)...) } δTcurr := δTtail.vδT[j] for k, δv := range δT { // the diff was backward; δTtail entries are with diff forward δv.New, δv.Old = δv.Old, δv.New δv_, already := δTcurr.ΔKV[k] if already { if δv != δv_ { panicf("[%v] inconsistent δv:\nδTcurr: %v\nδT: %v", k, δTcurr, δT) } } else { δTcurr.ΔKV[k] = δv } } // XXX update .KVAtTail, .lastRevOf } } // trackNew was adjusted to correspond to @tail potentially growing its key coverage. // Remap it back to @head and merge to .trackSet for _, δtrack := range vδtrack { δtrack.Reverse() // we saved it as lo<-hi; now we go lo->hi trackNew.ApplyΔ(δtrack) } δBtail.trackSet.UnionInplace(trackNew) return nil } // Update updates δB with per-object level ZODB changes. // // Only those objects from δZ that belong to tracked set are guaranteed to be // taken into account. In other words a tree history will assuredly include // only those keys, that correspond to tracked subset of δZ. // // δZ must include all objects changed by ZODB transaction. // // TODO optionally accept zconnOld/zconnNew from client func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) { headOld := δBtail.Head() defer xerr.Contextf(&err, "ΔBtail update %s -> %s", headOld, δZ.Tid) // update .trackSet and vδB from .trackNew δBtail.rebuild() tracef("Update @%s -> @%s\n", δBtail.Head(), δZ.Tid) tracef("δZ:\t%v\n", δZ.Changev) tracef("trackSet: %v\n", δBtail.trackSet) tracef("holeIdxByRoot: %v\n", δBtail.holeIdxByRoot) // XXX dup wrt rebuild? δBtail.δZtail.Append(δZ.Tid, δZ.Changev) δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, δBtail.trackSet) δB := ΔB{Rev: δZ.Tid, ByRoot: make(map[zodb.Oid]map[Key]ΔValue)} // skip opening DB connections if there is no change to any tree node if len(δtopsByRoot) == 0 { return δB, nil } // open ZODB connections corresponding to "old" and "new" states // TODO caller should provide one of those (probably new) as usually it has it txn, ctx := transaction.New(context.TODO()) // XXX defer txn.Abort() zconnOld, err := δBtail.db.Open(ctx, &zodb.ConnOptions{At: headOld}) if err != nil { return ΔB{}, err } zconnNew, err := δBtail.db.Open(ctx, &zodb.ConnOptions{At: δZ.Tid}) if err != nil { return ΔB{}, err } for root, δtops := range δtopsByRoot { holeIdx := δBtail.holeIdxByRoot[root] δT, δtrack, err := treediff(ctx, root, δtops, δZTC, δBtail.trackSet, holeIdx, zconnOld, zconnNew) if err != nil { return ΔB{}, err } tracef("\n-> root<%s> δkv: %v δtrack: %v\n", root, δT, δtrack) if len(δT) > 0 { // an object might be resaved without change δB.ByRoot[root] = δT δTtail, ok := δBtail.byRoot[root] if !ok { // this root was not tracked before -> create δTtail for it with empty changes δTtail = newΔTtail() δBtail.byRoot[root] = δTtail } δTtail.vδT = append(δTtail.vδT, ΔTree{Rev: δZ.Tid, ΔKV: δT}) // XXX rebuild KVAtTail // XXX rebuild lastRevOf } δBtail.trackSet.ApplyΔ(δtrack) } return δB, nil } func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) { δBtail.δZtail.ForgetPast(revCut) // XXX stub // XXX clean vδT } // update brings .δBtail up to date by recomputing diff XXX and taking new // entries in .δZtail into account. // func (δBtail *ΔBtail) update() // Get returns root[key] as of @at database state plus revision that changed it. // // if revExact=False - rev is upper estimate for the revision. // // key must be tracked // at must ∈ (tail, head] // // XXX root -> Oid ? func (δBtail *ΔBtail) GetAt(ctx context.Context, root *Tree, key Key, at zodb.Tid) (value Value, ok bool, rev zodb.Tid, revExact bool, err error) { defer xerr.Contextf(&err, "δBtail: root<%s>: get %d @%s", root.POid(), key, at) // XXX key not tracked -> panic // XXX at not ∈ (tail, head] -> panic // XXX handle deletion // FIXME stub -> that only ZBlk.rev is used //return @head, rev=.Tail(), revExact=false // XXX dirty -> rebuild // XXX -> index lastXXXOf(key) | linear scan ↓ looking for change <= at δTtail := δBtail.byRoot[root.POid()] if δTtail == nil { panicf("δBtail: root<%s> not tracked", root.POid()) } for i := len(δTtail.vδT)-1; i >= 0; i-- { δT := δTtail.vδT[i] if at < δT.Rev { continue } var δvalue ΔValue δvalue, ok = δT.ΔKV[key] if ok { value = δvalue.New rev = δT.Rev revExact = true break } } // key was found in δT ∈ δTtail if ok { return } // key not in history tail. // either use @tail[key], if it is present, or @head[key] rev = δBtail.Tail() revExact = false value, ok = δTtail.KVAtTail[key] // XXX kill - just use δvalue.Old from next-to-at entry if ok { return } // @tail[key] is not present - key was not changing in (tail, head]. // since at ∈ (tail, head] we can use @head[key] as the result xvalue, ok, err := root.Get(ctx, key) if err != nil || !ok { return } value, err = vOid(xvalue) if err != nil { ok = false return } return } // XXX don't need //func (δBtail *ΔBtail) SliceByRev(lo, hi zodb.Tid) /*readonly*/ []ΔB { // δassertSlice(δBtail, lo, hi) // panic("TODO") //} // SliceByRootRev returns history of a tree changes in (lo, hi] range. // // it must be called with the following condition: // // tail ≤ lo ≤ hi ≤ head // // the caller must not modify returned slice. // // Note: contrary to regular go slicing, low is exclusive while high is inclusive. // // XXX root -> *Tree ? func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) /*readonly*/[]ΔTree { δassertSlice(δBtail, lo, hi) // XXX locking δTtail, ok := δBtail.byRoot[root] if !ok { return []ΔTree{} } // XXX dup data - because they can be further rebuild in parallel to caller using them return δTtail.vδT // FIXME process lo, hi } // ---- misc ---- // δassertSlice asserts that δ.tail ≤ lo ≤ hi ≤ δ.head func δassertSlice(δ interface { Head() zodb.Tid; Tail() zodb.Tid }, lo, hi zodb.Tid) { tail := δ.Tail() head := δ.Head() if !(tail <= lo && lo <= hi && hi <= head) { panicf("invalid slice: (%s, %s]; (tail, head] = (%s, %s]", lo, hi, tail, head) } } /* // vtree returns human-readable representation of Tree data. func vtree(v []TreeEntry) string { s := "" for _, e := range v { if s != "" { s += " " } s += fmt.Sprintf("%v %s", e.Key(), vnode(e.Child())) } return s } */ // vnode returns brief human-readable representation of node. func vnode(node Node) string { kind := "?" switch node.(type) { case *Tree: kind = "T" case *Bucket: kind = "B" } return kind + node.POid().String() }