// Copyright (C) 2020-2021  Nexedi SA and Contributors.
//                          Kirill Smelkov <kirr@nexedi.com>
//
// This program is free software: you can Use, Study, Modify and Redistribute
// it under the terms of the GNU General Public License version 3, or (at your
// option) any later version, as published by the Free Software Foundation.
//
// You can also Link and Combine this program with other software covered by
// the terms of any of the Free Software licenses or any of the Open Source
// Initiative approved licenses and Convey the resulting work. Corresponding
// source of such a combination shall include the source code for all other
// software used.
//
// This program is distributed WITHOUT ANY WARRANTY; without even the implied
// warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
//
// See COPYING file for full licensing terms.
// See https://www.nexedi.com/licensing for rationale and options.

package zhist
// tests for δbtail.go
//
// This are the main tests for ΔBtail functionality. There are two primary testing concerns:
//
//   1) to verify treediff algorithm, and
//   2) to verify how ΔTtail rebuilds its history entries when set of tracked keys
//      grow upon either new Track requests, or upon Update that turned out to
//      trigger such growth of set of tracked keys.
//
// TestΔBTail*/Update and TestΔBTail*/rebuild exercise points "1" and "2" correspondingly.
//
// There are 2 testing approaches:
//
//   a) transition a BTree in ZODB through particular tricky tree topologies
//      and feed ΔBtail through created database transactions.
//   b) transition a BTree in ZODB through random tree topologies and feed
//      ΔBtail through created database transactions.
//
// TestΔBTail and TestΔBTailAllStructs implement approaches "a" and "b" correspondingly.
//
// testprog/treegen.py is used as helper to both:
//
//   - commit a particular BTree topology into ZODB, and
//   - to generate set of random tree topologies that all correspond to particular {k->v} dict.

import (
	"context"
	"flag"
	"fmt"
	"math"
	"math/rand"
	"reflect"
	"sort"
	"strings"
	"testing"
	"time"

	"lab.nexedi.com/kirr/go123/exc"
	"lab.nexedi.com/kirr/go123/xerr"
	"lab.nexedi.com/kirr/neo/go/transaction"
	"lab.nexedi.com/kirr/neo/go/zodb"
	_ "lab.nexedi.com/kirr/neo/go/zodb/wks"
)

// trackSet returns what should be ΔBtail.trackSet coverage for specified tracked key set.
func (rbs RBucketSet) trackSet(tracked setKey) PPTreeSubSet {
	// nil = don't compute keyCover
	// (trackSet is called from inside hot inner loop of rebuild test)
	trackSet := rbs._trackSetWithCov(tracked, nil)
	return trackSet
}

// trackSetWithCov returns what should be ΔBtail.trackSet and its key coverage for specified tracked key set.
func (rbs RBucketSet) trackSetWithCov(tracked setKey) (trackSet PPTreeSubSet, keyCover *RangedKeySet) {
	keyCover = &RangedKeySet{}
	trackSet = rbs._trackSetWithCov(tracked, keyCover)
	return trackSet, keyCover
}

func (rbs RBucketSet) _trackSetWithCov(tracked setKey, outKeyCover *RangedKeySet) (trackSet PPTreeSubSet) {
	trackSet = PPTreeSubSet{}
	for k := range tracked {
		kb := rbs.Get(k)
		if outKeyCover != nil {
			outKeyCover.AddRange(KeyRange{kb.lo, kb.hi_})
		}
		// trackSet explicitly records only regular buckets.
		// embedded buckets all have oid=zodb.InvalidOid and would lead to z
		newNode := false
		if kb.oid != zodb.InvalidOid {
			track, already := trackSet[kb.oid]
			if !already {
				track = &nodeInTree{parent: kb.parent.oid, nchild: 0}
				trackSet[kb.oid] = track
				newNode = true
			}
			if track.parent != kb.parent.oid {
				panicf("BUG: %s: B%s -> multiple parents: %s %s", rbs.coverage(), kb.oid, track.parent, kb.parent.oid)
			}
		}

		p := kb.parent
		for p != nil {
			ppoid := zodb.InvalidOid // oid of p.parent
			if p.parent != nil {
				ppoid = p.parent.oid
			}

			newParent := false
			pt, already := trackSet[p.oid]
			if !already {
				pt = &nodeInTree{parent: ppoid, nchild: 0}
				trackSet[p.oid] = pt
				newParent = true
			}
			if pt.parent != ppoid {
				panicf("BUG: %s: T%s -> multiple parents: %s %s", rbs.coverage(), p.oid, pt.parent, ppoid)
			}

			if newNode {
				pt.nchild++
			}
			newNode = newParent
			p = p.parent
		}
	}
	return trackSet
}



// XGetδKV translates {k -> δ<oid>} to {k -> δ(ZBlk(oid).data)} according to t1..t2 db snapshots.
func XGetδKV(t1, t2 *tTreeCommit, δkvOid map[Key]ΔValue) map[Key]Δstring {
	δkv := make(map[Key]Δstring, len(δkvOid))
	for k, δvOid := range δkvOid {
		δkv[k] = Δstring{
			Old: t1.xgetBlkData(δvOid.Old),
			New: t2.xgetBlkData(δvOid.New),
		}
	}
	return δkv
}

// KAdjMatrix is adjacency matrix that describes how set of tracked keys
// changes (always grow) when tree topology is updated from A to B.
//
// Adjacency matrix
//
// A,  B  - topologies			ex T/B1    T2/B1-B3
// Av, Bv - topologies with values	ex T/B1:a  T2/B1:b-B3:c
//
// δ(Av, Bv) - full diff {k -> v} for changed keys; DEL = k -> ø
//             ex δ(T/B1:a, T2/B1:b-B3:c) = {-1:a +1:b +3:c}
//
// Δ(T, Av, Bv) - subset of δ(Av, Bv) corresponding to initial tracking set T
//             ex Δ({1}, T/B1:a, T2/B1:b-B3:c) = {-1:a +1:b}  (no +3:c)		XXX fix example
//
// kadj(A,B) {} k -> {k'}:	- adjacency matrix
//	∃v1,v2:  k'∈ Δ({k}, Av1, Bv2)
//
//	ex kadj(T/B1, T2/B1-B3) = {1:{1} 3:{1,3} ∞:{1,3}}	k ∈ A+B+{∞}	XXX adjust to new kadj
//                                + {0:{1} 2:{1,3} + ... all possible keys}
//
// Δ(T, Av, Bv) = δ(Av, Bv)/kadj(A,B)[T]
//
// i.e. = δ(Av, Bv) for k: k ∈ U kadj(A,B)[·]
//                            ·∈T
//
// XXX fix definition for "and changed, or coverage changed"
//
// Note: adjacency matrix is symmetric (KAdj verifies this at runtime):
//
//	kadj(A,B) == kadj(B,A)
type KAdjMatrix map[Key]setKey

// Map returns kadj·keys .
func (kadj KAdjMatrix) Map(keys setKey) setKey {
	res := make(setKey, len(keys))
	for k := range keys {
		to, ok := kadj[k]
		if !ok {
			panicf("kadj.Map: %d ∉ kadj\n\nkadj: %v", k, kadj)
		}
		res.Update(to)
	}
	return res
}

// Mul returns kadjA·kadjB .
//
// (kadjA·kadjB).Map(keys) = kadjA.Map(kadjB.Map(keys))
func (kadjA KAdjMatrix) Mul(kadjB KAdjMatrix) KAdjMatrix {
	// ~ assert kadjA.keys == kadjB.keys
	// check only len here; the rest will be asserted by Map
	if len(kadjA) != len(kadjB) {
		panicf("kadj.Mul: different keys:\n\nkadjA: %v\nkadjB: %v", kadjA, kadjB)
	}

	kadj := make(KAdjMatrix, len(kadjB))
	for k, tob := range kadjB {
		kadj[k] = kadjA.Map(tob)
	}
	return kadj
}

// KAdj computes adjacency matrix for t1 -> t2 transition.
//
// The set of keys for which kadj matrix is computed can be optionally provided.
// This set of keys defaults to allTestKeys(t1,t2).
//
// KAdj itself is verified by testΔBTail on entries with .kadjOK set.
func KAdj(t1, t2 *tTreeCommit, keysv ...setKey) (kadj KAdjMatrix) {
	// assert KAdj(A,B) == KAdj(B,A)
	kadj12 := _KAdj(t1,t2, keysv...)
	kadj21 := _KAdj(t2,t1, keysv...)
	if !reflect.DeepEqual(kadj12, kadj21) {
		panicf("KAdj not symmetric:\nt1: %s\nt2: %s\nkadj12: %v\nkadj21: %v",
			t1.tree, t2.tree, kadj12, kadj21)
	}
	return kadj12
}

const debugKAdj = false
func debugfKAdj(format string, argv ...interface{}) {
	if debugKAdj {
		fmt.Printf(format, argv...)
	}
}

func _KAdj(t1, t2 *tTreeCommit, keysv ...setKey) (kadj KAdjMatrix) {
	var keys setKey
	switch len(keysv) {
	case 0:
		keys = allTestKeys(t1, t2)
	case 1:
		keys = keysv[0]
	default:
		panic("multiple key sets on the call")
	}

	debugfKAdj("\n\n_KAdj\n")
	debugfKAdj("t1:   %s\n", t1.tree)
	debugfKAdj("t2:   %s\n", t2.tree)
	debugfKAdj("keys: %s\n", keys)
	defer func() {
		debugfKAdj("kadj -> %v\n", kadj)
	}()

	// kadj = {} k -> adjacent keys.
	// if k is tracked and covered by changed leaf -> changes to adjacents must be in Update(t1->t2).
	kadj = KAdjMatrix{}
	for k := range keys {
		adj1 := setKey{}
		adj2 := setKey{}

		q1 := &RangedKeySet{}; q1.Add(k)
		q2 := &RangedKeySet{}; q2.Add(k)
		done1 := &RangedKeySet{}
		done2 := &RangedKeySet{}

		debugfKAdj("\nk%s\n", kstr(k))
		for !q1.Empty() || !q2.Empty() {
			debugfKAdj("q1:   %s\tdone1:  %s\n", q1, done1)
			debugfKAdj("q2:   %s\tdone2:  %s\n", q2, done2)
			for _, r1 := range q1.AllRanges() {
				lo1 := r1.lo
				for {
					b1 := t1.xkv.Get(lo1)
					debugfKAdj("  b1: %s\n", b1)
					for k_ := range keys {
						if b1.lo <= k_ && k_ <= b1.hi_ {
							adj1.Add(k_)
							debugfKAdj("    adj1 += %s\t-> %s\n", kstr(k_), adj1)
						}
					}
					b1r := KeyRange{b1.lo, b1.hi_}
					done1.AddRange(b1r)
					// q2 |= (b1.keyrange \ done2)
					δq2 := &RangedKeySet{}
					δq2.AddRange(b1r)
					δq2.DifferenceInplace(done2)
					q2.UnionInplace(δq2)
					debugfKAdj("q2 += %s\t-> %s\n", δq2, q2)

					// continue with next right bucket until r1 coverage is complete
					if r1.hi_ <= b1.hi_ {
						break
					}
					lo1 = b1.hi_ + 1
				}
			}
			q1.Clear()

			for _, r2 := range q2.AllRanges() {
				lo2 := r2.lo
				for {
					b2 := t2.xkv.Get(lo2)
					debugfKAdj("  b2: %s\n", b2)
					for k_ := range keys {
						if b2.lo <= k_ && k_ <= b2.hi_ {
							adj2.Add(k_)
							debugfKAdj("    adj2 += %s\t-> %s\n", kstr(k_), adj2)
						}
					}
					b2r := KeyRange{b2.lo, b2.hi_}
					done2.AddRange(b2r)
					// q1 |= (b2.keyrange \ done1)
					δq1 := &RangedKeySet{}
					δq1.AddRange(b2r)
					δq1.DifferenceInplace(done1)
					q1.UnionInplace(δq1)
					debugfKAdj("q1 += %s\t-> %s\n", δq1, q1)

					// continue with next right bucket until r2 coverage is complete
					if r2.hi_ <= b2.hi_ {
						break
					}
					lo2 = b2.hi_ + 1
				}
			}
			q2.Clear()
		}

		adj := setKey{}; adj.Update(adj1); adj.Update(adj2)
		kadj[k] = adj
	}

	return kadj
}


// xverifyΔBTail_Update verifies how ΔBTail handles ZODB update for a tree with changes in between t1->t2.
//
// Note: this test verifies only single treediff step of ΔBtail.Update.
//       the cycling phase of update, that is responsible to recompute older
//       entries when key coverage grows, is exercised by
//       xverifyΔBTail_rebuild.
func xverifyΔBTail_Update(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid, t1, t2 *tTreeCommit) {
	// verify transition at1->at2 for all initial states of tracked {keys} from kv1 + kv2 + ∞

	t.Run(fmt.Sprintf("Update/%s→%s", t1.tree, t2.tree), func(t *testing.T) {
		allKeys := allTestKeys(t1, t2)
		allKeyv := allKeys.SortedElements()

		kadj12 := KAdj(t1, t2)

		// verify at1->at2 for all combination of initial tracked keys.
		for kidx := range IntSets(len(allKeyv)) {
			keys := setKey{}
			for _, idx := range kidx {
				keys.Add(allKeyv[idx])
			}

			// this t.Run allocates and keeps too much memory in -verylong
			// also it is not so useful as above "Update/t1->t2"
			//t.Run(fmt.Sprintf(" track=%s", keys), func(t *testing.T) {
				xverifyΔBTail_Update1(t, subj, db, treeRoot, t1,t2, keys, kadj12)
			//})
		}
	})
}

// xverifyΔBTail_Update1 verifies how ΔBTail handles ZODB update at1->at2 from initial
// tracked state defined by initialTrackedKeys.
func xverifyΔBTail_Update1(t *testing.T, subj string, db *zodb.DB, treeRoot zodb.Oid, t1,t2 *tTreeCommit, initialTrackedKeys setKey, kadj KAdjMatrix) {
	X := exc.Raiseif
	//t.Logf("\n>>> Track=%s\n", initialTrackedKeys)

	δZ  := t2.δZ
	d12 := t2.δxkv

	var TrackedδZ setKey = nil
	var kadjTrackedδZ setKey = nil
	var δT, δTok map[Key]Δstring = nil, nil
	δZset := setOid{}
	for _, oid := range δZ.Changev {
		δZset.Add(oid)
	}

	// badf queues error message to be reported on return.
	var badv []string
	badf := func(format string, argv ...interface{}) {
		badv = append(badv, fmt.Sprintf(format, argv...))
	}
	defer func() {
		if badv != nil || t.Failed() {
			emsg := fmt.Sprintf("%s  ; tracked=%v :\n\n", subj, initialTrackedKeys)
			emsg += fmt.Sprintf("d12:  %v\nδTok: %v\nδT:   %v\n\n", d12, δTok, δT)
			emsg += fmt.Sprintf("δZ:               %v\n", δZset)
			emsg += fmt.Sprintf("Tracked^δZ:       %v\n", TrackedδZ)
			emsg += fmt.Sprintf("kadj[Tracked^δZ]: %v\n", kadjTrackedδZ)
			emsg += fmt.Sprintf("kadj: %v\n\n", kadj)
			emsg += strings.Join(badv, "\n")
			emsg += "\n"

			t.Fatal(emsg)
		}
	}()


	// δbtail @at1 with initial tracked set
	δbtail := NewΔBtail(t1.at, db)
	xtrackKeys(δbtail, t1, initialTrackedKeys)

	// TrackedδZ = Tracked ^ δZ  (i.e. a tracked node has changed, or its coverage was changed)
	TrackedδZ = setKey{}
	for k := range initialTrackedKeys {
		leaf1 := t1.xkv.Get(k)
		oid1 := leaf1.oid
		if oid1 == zodb.InvalidOid { // embedded bucket
			oid1 = leaf1.parent.oid
		}
		leaf2 := t2.xkv.Get(k)
		oid2 := leaf2.oid
		if oid2 == zodb.InvalidOid { // embedded bucket
			oid2 = leaf2.parent.oid
		}
		if δZset.Has(oid1) || δZset.Has(oid2) || (KeyRange{leaf1.lo,leaf1.hi_} != KeyRange{leaf2.lo,leaf2.hi_}) {
			TrackedδZ.Add(k)
		}
	}

	kadjTrackedδZ = setKey{} // kadj[Tracked^δZ]  (all keys adjacent to tracked^δZ)
	for k := range TrackedδZ {
		kadjTrackedδZ.Update(kadj[k])
	}


	// assert TrackedδZ ∈ kadj[TrackedδZ]
	trackNotInKadj := TrackedδZ.Difference(kadjTrackedδZ)
	if len(trackNotInKadj) > 0 {
		badf("BUG: Tracked^δZ ∉ kadj[Tracked^δZ]  ; extra=%v", trackNotInKadj)
		return
	}

	//              k ∈ d12
	// k ∈ δT  <=>
	//              k ∈   U kadj[·]
	//                   ·∈tracking^δZ
	δTok = map[Key]Δstring{} // d12[all keys that should be present in δT]
	for k,δv := range d12 {
		if kadjTrackedδZ.Has(k) {
			δTok[k] = δv
		}
	}

	ø := PPTreeSubSet{}

	// trackSet1 = xkv1[tracked1]
	// trackSet2 = xkv2[tracked2]  ( = xkv2[kadj[tracked1]]
	trackSet1, tkeyCov1 := t1.xkv.trackSetWithCov(initialTrackedKeys)
	trackSet2, tkeyCov2 := t2.xkv.trackSetWithCov(initialTrackedKeys.Union(kadjTrackedδZ))

	// verify δbtail.trackSet against @at1
	δbtail.assertTrack(t, "1", ø, trackSet1)

	// δB <- δZ
	//
	// also call _Update1 directly to verify δtkeycov return from treediff
	// the result of Update and _Update1 should be the same since δbtail is initially empty.
	δbtail_ := δbtail.Clone()
	δB1, err := δbtail_._Update1(δZ);  X(err)	// XXX don't compute treediff twice

	// assert tkeyCov1 ⊂ tkeyCov2
	dkeycov12 := tkeyCov1.Difference(tkeyCov2)
	if !dkeycov12.Empty() {
		t.Errorf("BUG: tkeyCov1 ⊄ tkeyCov2:\n\ttkeyCov1: %s\n\ttkeyCov2: %s\n\ttkeyCov1 \\ tkeyCov2: %s", tkeyCov1, tkeyCov2, dkeycov12)
	}

	// assert δtkeycov == δ(tkeyCov1, tkeyCov2)
	δtkeycovOK := tkeyCov2.Difference(tkeyCov1)
	δtkeycov := &RangedKeySet{}
	if __, ok := δB1.ByRoot[treeRoot]; ok {
		δtkeycov = __.δtkeycov1
	}
	if !δtkeycov.Equal(δtkeycovOK) {
		badf("δtkeycov wrong:\nhave: %s\nwant: %s", δtkeycov, δtkeycovOK)
	}

	δB, err  := δbtail.Update(δZ);     X(err)
	// XXX assert δB.roots == δTKeyCov roots
	// XXX assert δBtail[root].vδT = δBtail_[root].vδT

	if δB.Rev != δZ.Tid {
		badf("δB: rev != δZ.Tid  ; rev=%s  δZ.Tid=%s", δB.Rev, δZ.Tid)
		return
	}

	// verify δbtail.trackSet  against @at2
	δbtail.assertTrack(t, "2", trackSet2, ø)


	// assert δB.ByRoot == {treeRoot -> ...}  if δTok != ø
	//                  == ø                  if δTok == ø
	rootsOK := setOid{}
	if len(δTok) > 0 {
		rootsOK.Add(treeRoot)
	}
	roots := setOid{}
	for root := range δB.ΔByRoot {
		roots.Add(root)
	}
	if !reflect.DeepEqual(roots, rootsOK) {
		badf("δB: roots != rootsOK  ; roots=%v  rootsOK=%v", roots, rootsOK)
	}
	_, inδB := δB.ΔByRoot[treeRoot]
	if !inδB {
		return
	}


	// δT <- δB
	δToid := δB.ΔByRoot[treeRoot]   // {} k -> δoid
	δT     = XGetδKV(t1,t2, δToid) // {} k -> δ(ZBlk(oid).data)

	// δT must be subset of d12.
	// changed keys, that are
	// - in tracked set	 -> must be present in δT
	// - outside tracked set -> may  be present in δT  (kadj gives exact answer)

	// δT is subset of d12
	for _, k := range sortedKeys(δT) {
		_, ind12 := d12[k]
		if !ind12 {
			badf("δT[%v] ∉  d12", k)
		}
	}

	// k ∈ tracked set ->  must be present in δT
	// k ∉ tracked set ->  may  be present in δT  (kadj gives exact answer)
	for _, k := range sortedKeys(d12) {
		_, inδT   := δT[k]
		_, inδTok := δTok[k]
		if inδT && !inδTok {
			badf("δT[%v] ∉  δTok", k)
		}

		if !inδT && inδTok {
			badf("δT    ∌  δTok[%v]", k)
		}

		if inδT {
			if δT[k] != d12[k] {
				badf("δT[%v] ≠  δTok[%v]", k, k)
			}
		}
	}
}

// assertTrack verifies state of .trackSet and ΔTtail.trackNew.
// it assumes that only one tree root is being tracked.
// XXX place
func (δBtail *ΔBtail) assertTrack(t *testing.T, subj string, trackSetOK PPTreeSubSet, trackNewOK PPTreeSubSet) {
	t.Helper()
	if !δBtail.trackSet.Equal(trackSetOK) {
		t.Errorf("%s: trackSet:\n\thave: %v\n\twant: %v", subj, δBtail.trackSet, trackSetOK)
	}

	roots := setOid{}
	for root := range δBtail.vδTbyRoot {
		roots.Add(root)
	}

	nrootsOK := 1
	if trackSetOK.Empty() && trackNewOK.Empty() {
		nrootsOK = 0
	}
	if len(roots) != nrootsOK {
		t.Errorf("%s: len(vδTbyRoot) != %d   ; roots=%v", subj, nrootsOK, roots)
		return
	}
	if nrootsOK == 0 {
		return
	}

	root := roots.Elements()[0]

	δTtail := δBtail.vδTbyRoot[root]

	trackNewRootsOK := setOid{}
	if !trackNewOK.Empty() {
		trackNewRootsOK.Add(root)
	}

	if !δBtail.trackNewRoots.Equal(trackNewRootsOK) {
		t.Errorf("%s: trackNewRoots:\n\thave: %v\n\twant: %v", subj, δBtail.trackNewRoots, trackNewRootsOK)
	}

	if !δTtail.trackNew.Equal(trackNewOK) {
		t.Errorf("%s: vδT.trackNew:\n\thave: %v\n\twant: %v", subj, δTtail.trackNew, trackNewOK)
	}
}

// xverifyΔBTail_rebuild verifies ΔBtail.rebuild during t0->t1->t2 transition.
//
// t0->t1 exercises from-scratch rebuild,
// t1->t2 further exercises incremental rebuild.
//
// It also exercises rebuild phase of ΔBtail.Update.
func xverifyΔBTail_rebuild(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, t0, t1, t2 *tTreeCommit) {
	t.Run(fmt.Sprintf("rebuild/%s→%s", t0.tree, t1.tree), func(t *testing.T) {
		tAllKeys := allTestKeys(t0, t1, t2)
		tAllKeyv := tAllKeys.SortedElements()

		// tid -> "at_i"
		xat := map[zodb.Tid]string{
			t0.at: "at0",
			t1.at: "at1",
			t2.at: "at2",
		}

		//fmt.Printf("@%s:  %v\n", xat[t0.at], t0.xkv.Flatten())
		//fmt.Printf("@%s:  %v\n", xat[t1.at], t1.xkv.Flatten())
		//fmt.Printf("@%s:  %v\n", xat[t2.at], t2.xkv.Flatten())

		kadj10 := KAdj(t1,t0, allTestKeys(t0,t1,t2))
		kadj21 := KAdj(t2,t1, allTestKeys(t0,t1,t2))
		kadj12 := KAdj(t1,t2, allTestKeys(t0,t1,t2))

		// kadj210 = kadj10·kadj21
		kadj210 := kadj10.Mul(kadj21)

		ø := PPTreeSubSet{}

		// verify t0 -> t1 Track(keys1) Rebuild -> t2 Track(keys2) Rebuild
		// for all combinations of keys1 and keys2
		for k1idx := range IntSets(len(tAllKeyv)) {
			keys1 := setKey{}
			for _, idx1 := range k1idx {
				keys1.Add(tAllKeyv[idx1])
			}

			// δkv1_1 = t1.δxkv / kadj10(keys1)
			keys1_0 := kadj10.Map(keys1)
			δkv1_1 := map[Key]Δstring{}
			for k := range keys1_0 {
				δv, ok := t1.δxkv[k]
				if ok {
					δkv1_1[k] = δv
				}
			}

			Tkeys1   := t1.xkv.trackSet(keys1)
			Tkeys1_0 := t1.xkv.trackSet(keys1_0)

			t.Run(fmt.Sprintf(" T%s;R", keys1), func(t *testing.T) {
				δbtail := NewΔBtail(t0.at, db)

				// assert trackSet=ø, trackNew=ø, vδB=[]
				δbtail.assertTrack(t, "@at0", ø, ø)
				assertΔTtail(t, "@at0", δbtail, t0, treeRoot, xat,
					/*vδT=ø*/)

				xverifyΔBTail_rebuild_U(t, δbtail, treeRoot, t0, t1, xat,
					/*trackSet=*/ø,
					/*vδT=ø*/)
				xverifyΔBTail_rebuild_TR(t, δbtail, t1, treeRoot, xat,
					// after Track(keys1)
					keys1,
					/*trackSet=*/ ø,
					/*trackNew=*/ Tkeys1,

					// after rebuild
					/*trackSet=*/ Tkeys1_0,
					/*vδT=*/ δkv1_1)

				t.Run((" →" + t2.tree), func(t *testing.T) {
					// keys1R2 is full set of keys that should become tracked after
					// Update()  (which includes rebuild)
					keys1R2 := kadj12.Map(keys1)
					for {
						keys1R2_ := kadj210.Map(keys1R2)
						if keys1R2.Equal(keys1R2_) {
							break
						}
						keys1R2 = keys1R2_
					}

					// δkvX_k1R2 = tX.δxkv / keys1R2
					δkv1_k1R2 := map[Key]Δstring{}
					δkv2_k1R2 := map[Key]Δstring{}
					for k := range keys1R2 {
						δv1, ok := t1.δxkv[k]
						if ok {
							δkv1_k1R2[k] = δv1
						}
						δv2, ok := t2.δxkv[k]
						if ok {
							δkv2_k1R2[k] = δv2
						}
					}

					Tkeys1R2  := t2.xkv.trackSet(keys1R2)

					xverifyΔBTail_rebuild_U(t, δbtail, treeRoot, t1, t2, xat,
						/*trackSet=*/ Tkeys1R2,
						/*vδT=*/ δkv1_k1R2, δkv2_k1R2)

					// tRestKeys2      = tAllKeys - keys1
					// reduce that to  = tAllKeys - keys1R2   in short mode
					// ( if key from keys2 already became tracked after Track(keys1) + Update,
					//   adding Track(that-key), is not adding much testing coverage to recompute paths )
					var tRestKeys2 setKey
					if testing.Short() {
						tRestKeys2 = tAllKeys.Difference(keys1R2)
					} else {
						tRestKeys2 = tAllKeys.Difference(keys1)
					}

					tRestKeyv2 := tRestKeys2.SortedElements()
					for k2idx := range IntSets(len(tRestKeyv2)) {
						keys2 := setKey{}
						for _, idx2 := range k2idx {
							keys2.Add(tRestKeyv2[idx2])
						}

						// keys12R2 is full set of keys that should become tracked after
						// Track(keys2) + rebuild
						keys12R2 := keys1R2.Union(keys2)
						for {
							keys12R2_ := kadj210.Map(keys12R2)
							if keys12R2.Equal(keys12R2_) {
								break
							}
							keys12R2 = keys12R2_
						}

						Tkeys2    := t2.xkv.trackSet(keys2)
						Tkeys12R2 := t2.xkv.trackSet(keys12R2)
/*
						fmt.Printf("\n\n\nKKK\nkeys1=%s  keys2=%s\n", keys1, keys2)
						fmt.Printf("keys1R2:  %s\n", keys1R2)
						fmt.Printf("keys12R2: %s\n", keys12R2)

						fmt.Printf("t0.xkv: %v\n", t0.xkv)
						fmt.Printf("t1.xkv: %v\n", t1.xkv)
						fmt.Printf("t2.xkv: %v\n", t2.xkv)
						fmt.Printf("kadj21: %v\n", kadj21)
						fmt.Printf("kadj12: %v\n", kadj12)
						fmt.Printf("Tkeys2   -> %s\n", Tkeys2)
						fmt.Printf("Tkeys1R2 -> %s\n", Tkeys1R2)
						fmt.Printf("Tkeys2 \\ Tkeys1R2  -> %s\n", Tkeys2.Difference(Tkeys1R2))
						fmt.Printf("\n\n\n")
*/


						// δkvX_k12R2 = tX.δxkv / keys12R2
						δkv1_k12R2 := make(map[Key]Δstring, len(t1.δxkv))
						δkv2_k12R2 := make(map[Key]Δstring, len(t2.δxkv))
						for k := range keys12R2 {
							δv1, ok := t1.δxkv[k]
							if ok {
								δkv1_k12R2[k] = δv1
							}
							δv2, ok := t2.δxkv[k]
							if ok {
								δkv2_k12R2[k] = δv2
							}
						}

						// t.Run is expensive at this level of nest
						//t.Run(" T"+keys2.String()+";R", func(t *testing.T) {
							δbtail_ := δbtail.Clone()
							xverifyΔBTail_rebuild_TR(t, δbtail_, t2, treeRoot, xat,
								// after Track(keys2)
								keys2,
								/*trackSet*/ Tkeys1R2,
								/*trackNew*/ Tkeys2.Difference(
									// trackNew should not cover ranges that are
									// already in trackSet
									Tkeys1R2),

								// after rebuild
								/* trackSet=*/ Tkeys12R2,
								/*vδT=*/ δkv1_k12R2, δkv2_k12R2)
						//})
					}
				})
			})
		}
	})
}

// xverifyΔBTail_rebuild_U verifies ΔBtail state after Update(ti->tj).
func xverifyΔBTail_rebuild_U(t *testing.T, δbtail *ΔBtail, treeRoot zodb.Oid, ti, tj *tTreeCommit, xat map[zodb.Tid]string, trackSet PPTreeSubSet, vδTok ...map[Key]Δstring) {
	t.Helper()
	X := exc.Raiseif
	ø := PPTreeSubSet{}

	subj := fmt.Sprintf("after Update(@%s→@%s)", xat[ti.at], xat[tj.at])

	// Update ati -> atj
	δB, err := δbtail.Update(tj.δZ);  X(err)
	δbtail.assertTrack(t, subj, trackSet, ø)
	assertΔTtail(t, subj, δbtail, tj, treeRoot, xat, vδTok...)

	// assert δB = vδTok[-1]
	var δT, δTok map[Key]Δstring
	if l := len(vδTok); l > 0 {
		δTok = vδTok[l-1]
	}
	if len(δTok) == 0 {
		δTok = nil
	}
	δrootsOK := 1
	if δTok == nil {
		δrootsOK = 0
	}

	δroots := setOid{}
	for root := range δbtail.vδTbyRoot {
		δroots.Add(root)
	}
	δToid, ok := δB.ΔByRoot[treeRoot]
	if ok {
		δT = XGetδKV(ti, tj, δToid)
	}
	if δB.Rev != tj.at {
		t.Errorf("%s: δB.Rev: have %s  ; want %s", subj, δB.Rev, tj.at)
	}
	if len(δB.ΔByRoot) != δrootsOK {
		t.Errorf("%s: len(δB.ΔByRoot) != %d  ; δroots=%v", subj, δrootsOK, δroots)
	}
	if !δTEqual(δT, δTok) {
		t.Errorf("%s: δB.ΔBByRoot[%s]:\nhave: %v\nwant: %v", subj, treeRoot, δT, δTok)
	}
}

// xverifyΔBTail_rebuild_TR verifies ΔBtail state after Track(keys) + rebuild.
func xverifyΔBTail_rebuild_TR(t *testing.T, δbtail *ΔBtail, tj *tTreeCommit, treeRoot zodb.Oid, xat map[zodb.Tid]string, keys setKey, trackSet PPTreeSubSet, trackNew, trackSetAfterRebuild PPTreeSubSet, vδTok ...map[Key]Δstring) {
	t.Helper()
	ø := PPTreeSubSet{}

	// Track(keys)
	xtrackKeys(δbtail, tj, keys)

	subj := fmt.Sprintf("@%s: after Track%v", xat[tj.at], keys)
	δbtail.assertTrack(t, subj, trackSet, trackNew)

	δbtail.rebuildAll()

	subj += " + rebuild"
	δbtail.assertTrack(t, subj, trackSetAfterRebuild, ø)

	// XXX verify Get -> XXX assertΔTtail ?

	// verify δbtail.vδTbyRoot[treeRoot]
	assertΔTtail(t, subj, δbtail, tj, treeRoot, xat, vδTok...)
}

// assertΔTtail verifies state of ΔTtail that corresponds to treeRoot in δbtail.
// it also verifies that δbtail.vδBroots matches ΔTtail data.
func assertΔTtail(t *testing.T, subj string, δbtail *ΔBtail, tj *tTreeCommit, treeRoot zodb.Oid, xat map[zodb.Tid]string, vδTok ...map[Key]Δstring) {
	t.Helper()
	// XXX +KVAtTail, +lastRevOf

	l := len(vδTok)
	var vatOK  []zodb.Tid
	var vδTok_ []map[Key]Δstring
	at2t := map[zodb.Tid]*tTreeCommit{tj.at: tj}
	t0 := tj
	for i := 0; i<l; i++ {
		// empty vδTok entries means they should be absent in vδT
		if δTok := vδTok[l-i-1]; len(δTok) != 0 {
			vatOK  = append([]zodb.Tid{t0.at}, vatOK...)
			vδTok_ = append([]map[Key]Δstring{δTok}, vδTok_...)
		}
		t0 = t0.prev
		at2t[t0.at] = t0
	}
	vδTok = vδTok_
	δTtail, ok := δbtail.vδTbyRoot[treeRoot]
	var vδToid []ΔTree
	if ok {
		vδToid = δTtail.vδT
	}

	l = len(vδToid)
	var vat []zodb.Tid
	var vδT []map[Key]Δstring
	atPrev := t0.at
	for _, δToid := range vδToid {
		vat = append(vat, δToid.Rev)
		δT := XGetδKV(at2t[atPrev], at2t[δToid.Rev], δToid.ΔKV) // {} k -> δ(ZBlk(oid).data)
		vδT = append(vδT, δT)
		atPrev = δToid.Rev
	}

	var vatδB []zodb.Tid // δbtail.vδBroots/treeRoot
	for _, δBroots := range δbtail.vδBroots {
		if δBroots.ΔRoots.Has(treeRoot) {
			vatδB = append(vatδB, δBroots.Rev)
		}
	}

	tok := tidvEqual(vat,   vatOK) && vδTEqual(vδT, vδTok)
	bok := tidvEqual(vatδB, vatOK)
	if !(tok && bok) {
		emsg := fmt.Sprintf("%s: vδT:\n", subj)
		have := ""
		for i := 0; i<len(vδT); i++ {
			have += fmt.Sprintf("\n\t@%s: %v", xat[vat[i]], vδT[i])
		}
		emsg += fmt.Sprintf("have: %s\n", have)

		if !tok {
			want := ""
			for i := 0; i<len(vδTok); i++ {
				want += fmt.Sprintf("\n\t@%s: %v", xat[vatOK[i]], vδTok[i])
			}
			emsg += fmt.Sprintf("want: %s\n", want)
		}

		if !bok {
			vδb_root := ""
			for i := 0; i<len(vatδB); i++ {
				vδb_root += fmt.Sprintf("\n\t@%s", xat[vatδB[i]])
			}
			emsg += fmt.Sprintf("vδb/root: %s\n", vδb_root)
		}

		t.Error(emsg)
	}
}

// xtrackKeys issues δbtail.Track requests for tree[keys].
// XXX place
func xtrackKeys(δbtail *ΔBtail, t *tTreeCommit, keys setKey) {
	X := exc.Raiseif
	head := δbtail.Head()
	if head != t.at {
		panicf("BUG: δbtail.head: %s  ; t.at: %s", head, t.at)
	}

	for k := range keys {
		// NOTE: if tree is deleted - the following adds it to tracked
		// set with every key being a hole. This aligns with the
		// following situation
		//
		//	T1 -> ø -> T2
		//
		// where after T1->ø, even though the tree becomes deleted, its root
		// continues to be tracked and all keys migrate to holes in the
		// tracking set. By aligning initial state to the same as after
		// T1->ø, we test what will happen on ø->T2.
		b := t.xkv.Get(k)
		err := δbtail.track(k, b.Path()); X(err)
	}
}

// xverifyΔBTail_GetAt verifies δBtail.Get on series of vt ZODB changes.
// XXX
// XXX kill
/*
func ___xverifyΔBTail_GetAt(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, vt ...*tTreeCommit) {
	subj := vt[0].tree
	for _, t := range vt[1:] {
		subj += "→" + t.tree
	}

	t.Run(fmt.Sprintf("Get/%s", subj), func(t *testing.T) {
		// tid -> "at_i"
		xat := map[zodb.Tid]string{}
		for i := range vt {
			xat[vt[i].at] = fmt.Sprintf("at%d", i)

			fmt.Printf("@%s:  %v\n", xat[vt[i].at], vt[i].xkv.Flatten())
		}

		tkeys := allTestKeys(vt...)
		tkeyv := tkeys.SortedElements()

		// verify  t1->t2-> ... ->tn Track(keys) Get(keys, @at)
		// for all combinations of tracked keys and at
		for kidx := range IntSets(len(tkeyv)) {
			keys := setKey{}
			for _, idx := range kidx {
				keys.Add(tkeyv[idx])
			}

			t.Run(fmt.Sprintf("track=%s", keys), func(t *testing.T) {
				xverifyΔBTail_GetAt1(t, db, treeRoot, vt, xat, keys)
			})
		}
	})
}

func xverifyΔBTail_GetAt1(t *testing.T, db *zodb.DB, treeRoot zodb.Oid, vt []*tTreeCommit, xat map[zodb.Tid]string, keys setKey) {
	X := exc.Raiseif

	// t1->t2-> ... -> tn
	δbtail := NewΔBtail(vt[0].at, db)
	for i := 1; i < len(vt); i++ {
		_, err := δbtail.Update(vt[i].δZ);  X(err)
	}

	// Track(keys)
	txn, ctx := transaction.New(context.Background())
	defer txn.Abort()
	zconn, err := db.Open(ctx, &zodb.ConnOptions{At: vt[len(vt)-1].at});  X(err)
	xtree, err := zconn.Get(ctx, treeRoot);	 X(err)
	ztree := xtree.(*Tree)

	for k := range keys {
		_, _, path, err := ZTreeGetBlkData(ctx, ztree, k);  X(err)
		err = δbtail.Track(k, path);  X(err)
	}

	// verify GetAt(k, @at) for all keys and @at
	for i := 1; i < len(vt); i++ {
		at := vt[i].at
		for _, k := range keys.SortedElements() {
			vOid, ok, rev, revExact, err := δbtail.GetAt(ctx, ztree, k, at);  X(err)
			v := xzgetBlkDataAt(db, vOid, rev)

			v_, ok_ := vt[i].xkv.Get(k).kv[k]
			rev_, revExact_ := vt[i].at, false
			for j := i-1; j >= 0; j-- {
				v__ := vt[j].xkv.Get(k).kv[k]
				if v__ != v_ {
					rev_ = vt[j+1].at
					revExact_ = true
					break
				}
				rev_ = vt[j].at
			}

			if v == "" { v = DEL }
			if v_ == "" { v_ = DEL }

			if !(v == v_ && ok == ok_ && rev == rev_ && revExact == revExact_) {
				t.Errorf("Get(%d, @%s) ->\nhave: %s, %v, @%s, %v\nwant: %s, %v, @%s, %v",
					k, xat[at],
					v, ok, xat[rev], revExact,
					v_, ok_, xat[rev_], revExact_)
			}
		}
	}
}
*/


// ----------------------------------------

// ΔBTestEntry represents one entry in ΔBTail tests.
type ΔBTestEntry struct {
	tree   string      // next tree topology
	kadjOK KAdjMatrix  // adjacency matrix against previous case (optional)
	flags  ΔBTestFlags
}

type ΔBTestFlags int
const ΔBTest_SkipUpdate  ΔBTestFlags = 1 // skip verifying Update  for this test entry
const ΔBTest_SkipRebuild ΔBTestFlags = 2 // skip verifying rebuild for this test entry

// ΔBTest converts xtest into ΔBTestEntry.
// xtest can be string|ΔBTestEntry.
func ΔBTest(xtest interface{}) ΔBTestEntry {
	var test ΔBTestEntry
	switch xtest := xtest.(type) {
	case string:
		test.tree   = xtest
		test.kadjOK = nil
		test.flags  = 0
	case ΔBTestEntry:
		test = xtest
	default:
		panicf("BUG: ΔBTest: bad type %T", xtest)
	}
	return test
}


// testΔBTail verifies ΔBTail on sequence of tree topologies coming from testq.
func testΔBTail(t_ *testing.T, testq chan ΔBTestEntry) {
	t := tNewTreeEnv(t_)

	var t0 *tTreeCommit
	for test := range testq {
		t1 := t.Head()
		t2 := t.CommitTree(test.tree)

		subj := fmt.Sprintf("%s -> %s", t1.tree, t2.tree)
		//t.Logf("\n\n\n**** %s ****\n\n", subj)

		// KAdj
		if kadjOK := test.kadjOK; kadjOK != nil {
			t.Run(fmt.Sprintf("KAdj/%s→%s", t1.tree, t2.tree), func(t *testing.T) {
				kadj := KAdj(t1, t2)
				if !reflect.DeepEqual(kadj, kadjOK) {
					t.Fatalf("BUG: computed kadj is wrong:\nkadjOK: %v\nkadj  : %v\n\n", kadjOK, kadj)
				}
			})
		}

		// ΔBTail.Update
		if test.flags & ΔBTest_SkipUpdate == 0 {
			xverifyΔBTail_Update(t.T, subj, t.db, t.Root(), t1,t2)
		}

		// ΔBTail.rebuild
		if t0 != nil && (test.flags & ΔBTest_SkipRebuild == 0) {
			xverifyΔBTail_rebuild(t.T, t.db, t.Root(), t0,t1,t2)
		}

		t0, t1 = t1, t2
	}
}

// TestΔBTail verifies ΔBTail for explicitly provided tree topologies.
func TestΔBTail(t *testing.T) {
	// K is shorthand for setKey
	K := func(keyv ...Key) setKey {
		ks := setKey{}
		for _, k := range keyv { ks.Add(k) }
		return ks
	}
	// oo is shorthand for KeyMax
	const oo = KeyMax
	// A is shorthand for KAdjMatrix
	type A = KAdjMatrix
	// Δ is shorthand for ΔBTestEntry
	Δ := func(tree string, kadjOK A) (test ΔBTestEntry) {
		test.tree   = tree
		test.kadjOK = kadjOK
		return test
	}

	// test known cases going through tree1 -> tree2 -> ...
	testv := []interface{} {
		// start from non-empty tree to verify both ->empty and empty-> transitions
		"T/B1:a,2:b",

		// empty
		"T/B:",

		// +1
		Δ("T/B1:a",
			A{1:  K(1,oo),
			  oo: K(1,oo)}),

		// +2
		Δ("T/B1:a,2:b",
			A{1:  K(1,2,oo),
			  2:  K(1,2,oo),
			  oo: K(1,2,oo)}),

		// -1
		Δ("T/B2:b",
			A{1:  K(1,2,oo),
			  2:  K(1,2,oo),
			  oo: K(1,2,oo)}),

		// 2: b->c
		Δ("T/B2:c",
			A{2:  K(2,oo),
			  oo: K(2,oo)}),

		// +1 in new bucket (to the left)
		Δ("T2/B1:a-B2:c",
			A{1:  K(1,2,oo),
			  2:  K(1,2,oo),
			  oo: K(1,2,oo)}),

		// +3 in new bucket (to the right)
		Δ("T2,3/B1:a-B2:c-B3:c",
			A{1:  K(1),
			  2:  K(2,3,oo),
			  3:  K(2,3,oo),
			  oo: K(2,3,oo)}),

		// bucket split; +3 in new bucket
		"T/B1:a,2:b",
		Δ("T2/B1:a-B2:b,3:c",
			A{1:  K(1,2,3,oo),
			  2:  K(1,2,3,oo),
			  3:  K(1,2,3,oo),
			  oo: K(1,2,3,oo)}),

		// bucket split; +3 in new bucket; +4 +5 in another new bucket
		// everything becomes tracked because original bucket had [-∞,∞) coverage
		"T/B1:a,2:b",
		Δ("T2,4/B1:a-B2:b,3:c-B4:d,5:e",
			A{1:  K(1,2,3,4,5,oo),
			  2:  K(1,2,3,4,5,oo),
			  3:  K(1,2,3,4,5,oo),
			  4:  K(1,2,3,4,5,oo),
			  5:  K(1,2,3,4,5,oo),
			  oo: K(1,2,3,4,5,oo)}),

		// reflow of keys: even if tracked={1}, changes to all B nodes need to be rescanned:
		// +B12 forces to look in -B23 which adds -3 into δ, which
		// forces to look into +B34 and so on.
		"T2,4,6/B1:a-B2:b,3:c-B4:d,5:e-B6:f,7:g",
		Δ("T3,5,7/B1:g,2:f-B3:e,4:d-B5:c,6:b-B7:a",
			A{1:  K(1,2,3,4,5,6,7,oo),
			  2:  K(1,2,3,4,5,6,7,oo),
			  3:  K(1,2,3,4,5,6,7,oo),
			  4:  K(1,2,3,4,5,6,7,oo),
			  5:  K(1,2,3,4,5,6,7,oo),
			  6:  K(1,2,3,4,5,6,7,oo),
			  7:  K(1,2,3,4,5,6,7,oo),
			  oo: K(1,2,3,4,5,6,7,oo)}),

		// reflow of keys for rebuild: even if tracked1={}, tracked2={1}, changes to
		// all A/B/C nodes need to be rescanned. Contrary to the above case the reflow
		// is not detectable at separate diff(A,B) and diff(B,C) runs.
		"T3,5,7/B1:a,2:b-B3:c,4:d-B5:e,6:f-B7:g,8:h",
		"T/B1:b",
		"T2,4,6/B1:a-B2:b,3:c-B4:d,5:e-B6:f,7:g",
		// similar situation where rebuild has to detect reflow in between non-neighbour trees
		"T3,6/B1:a,2:b-B3:c,4:d-B6:f,7:g",
		"T4,7/B1:b-B4:d,5:e-B7:g,8:h",
		"T2,5,8/B1:a-B2:b,3:c-B5:e,6:f-B8:h,9:i",

		// depth=2; bucket split; +3 in new bucket; left T remain
		// _unchanged_ even though B under it is modified.
		"T/T/B1:a,2:b",
		Δ("T2/T-T/B1:a-B2:b,3:c",
			A{1:  K(1,2,3,oo),
			  2:  K(1,2,3,oo),
			  3:  K(1,2,3,oo),
			  oo: K(1,2,3,oo)}),

		// depth=2; like prev. case, but additional right arm with +4 +5 is added.
		"T/T/B1:a,2:b",
		Δ("T2,4/T-T-T/B1:a-B2:b,3:c-B4:d,5:e",
			A{1:  K(1,2,3,4,5,oo),
			  2:  K(1,2,3,4,5,oo),
			  3:  K(1,2,3,4,5,oo),
			  4:  K(1,2,3,4,5,oo),
			  5:  K(1,2,3,4,5,oo),
			  oo: K(1,2,3,4,5,oo)}),

		// depth=2; bucket split; +3 in new bucket; t0 and t1 split;
		// +right arm (T7/B45-B89).
		"T/T/B1:a,2:b",
		Δ("T4/T2-T7/B1:a-B2:b,3:c-B4:d,5:e-B8:h,9:i",
			A{1:  K(1,2,3,4,5,8,9,oo),
			  2:  K(1,2,3,4,5,8,9,oo),
			  3:  K(1,2,3,4,5,8,9,oo),
			  4:  K(1,2,3,4,5,8,9,oo),
			  5:  K(1,2,3,4,5,8,9,oo),
			  8:  K(1,2,3,4,5,8,9,oo),
			  9:  K(1,2,3,4,5,8,9,oo),
			  oo: K(1,2,3,4,5,8,9,oo)}),


		// 2 reflow to right B neighbour; 8 splits into new B; δ=ø
		"T3/B1:a,2:b-B4:d,8:h",
		"T2,5/B1:a-B2:b,4:d-B8:h",

		// case where kadj does not grow too much as leafs coverage remains stable
		"T4,8/B1:a,2:b-B5:d,6:e-B10:g,11:h",
		Δ("T4,8/B2:b,3:c-B6:e,7:f-B11:h,12:i",
			A{1:  K(1,2,3),
			  2:  K(1,2,3),
			  3:  K(1,2,3),
			  5:  K(5,6,7),
			  6:  K(5,6,7),
			  7:  K(5,6,7,),
			  10: K(10,11,12,oo),
			  11: K(10,11,12,oo),
			  12: K(10,11,12,oo),
			  oo: K(10,11,12,oo)}),


		// tree deletion
		// having ø in the middle of the test cases exercises all:
		// * `ø -> Tree ...`		(tree is created anew),
		// * `... Tree -> ø`		(tree is deleted), and
		// * `Tree -> ø -> Tree`	(tree is deleted and then recreated)
		DEL,

		// tree rotation
		"T3/B2:b-B3:c,4:d",
		"T5/T3-T7/B2:a-B3:a,4:a-B6:a-B8:a",

		// found by AllStructs ([1] is not changed, but because B1 is
		// unlinked and 1 migrates to other bucket, changes in that
		// other bucket must be included into δT)
		"T1,2/B0:e-B1:d-B2:g,3:a",
		"T1/B0:d-B1:d,2:d",
		// ----//---- with depth=2
		"T1,2/T-T-T/B0:a-B1:b-B2:c,3:d",
		"T1/T-T/B0:e-B1:b,2:f",

		// XXX depth=3 (to verify recursion and selecting which tree children to follow or not)


		// degenerate topology from ZODB tests
		// https://github.com/zopefoundation/ZODB/commit/6cd24e99f89b
		// https://github.com/zopefoundation/BTrees/blob/4.7.2-1-g078ba60/BTrees/tests/testBTrees.py#L20-L57
		"T4/T2-T/T-T-T6,10/B1:a-B3:b-T-T-T/T-B7:c-B11:d/B5:e",
		"T/B1:e,5:d,7:c,8:b,11:a", // -3 +8

		// was leading treegen to generate corrupt trees
		"T/T1/T-T/B0:g-B1:e,2:d,3:h",
		"T1/T-T3/B0:g-T-T/B1:e,2:d-B3:h",

		// was leading to wrongly computed trackSet2 due to top not
		// being tracked to tree root.
		"T/T1/B0:a-B1:b",
		"T/T1/T-T/B0:c-B1:d",

		// was leading to wrongly computed trackSet2: leaf bucket not
		// reparented to root.
		"T/T/B0:a",
		"T/B0:a",

		// δtkeycov grows due to change in parent tree only
		"T3/B1:a-B8:c",
		"T7/B1:a-B8:c",
		// ----//----
		"T3/B1:a,2:b-B8:c,9:d",
		"T7/B1:a,2:b-B8:c,9:d",
		// ----//---- depth=2
		"T3/T-T/B1:a,2:b-B8:c,9:d",
		"T7/T-T/B1:a,2:b-B8:c,9:d",
		// ----//---- found by AllStructs
		"T1,3/B0:d-B1:a-B3:d,4:g",
		"T1,4/B0:e-B1:a-B4:c",
		// ----//---- found by AllStructs
		"T2,4/T-T-T/T1-T-B4:f/T-T-B3:f/B0:h-B1:f",
		"T4/T-T/B3:f-T/B4:a",


		// ---- found by AllStructs ----

		// trackSet2 wrongly computed due to top not being tracked to tree root
		"T2/T1-T/B0:g-B1:b-T/B2:b,3:a",
		"T2/T1-T/T-T-B2:a/B0:c-B1:g",

		// unchanged node is reparented
		"T1/B0:c-B1:f",
		"T1/T-T/B0:c-T/B1:h",

		// SIGSEGV in ApplyΔ
		"T1/T-T2/T-B1:c-B2:c/B0:g",
		"T1/T-T/B0:g-T/B1:e",

		// trackSet corruption: oid is pointed by some .parent but is not present
		"T1/T-T/B0:g-T2/B1:h-B2:g",
		"T/T1/T-T2/B0:e-B1:f-B2:g",

		// ApplyΔ -> xunion: node is reachable from multiple parents
		// ( because xdifference did not remove common non-leaf node
		//   under which there were also other changed, but not initially
		//   tracked, node )
		"T4/T1-T/T-T2-B4:c/T-T-T/B0:f-B1:h-B2:g,3:b",
		"T1/T-T/T-T2/T-T-T/B0:f-B1:h-B2:f",
		// ----//----
		"T3/T1-T/T-T2-T/B0:b-T-T-B3:h/B1:e-B2:a",
		"T1/T-T4/T-T2-T/T-T-T-T/B0:b-B1:e-B2:a,3:c-B4:e",
		// ----//----
		"T/T1,3/T-T2-T4/B0:b-T-T-B3:g-B4:c/B1:b-B2:e",
		"T1,4/T-T-T/T-T2-B4:f/T-T-T/B0:h-B1:b-B2:h,3:a",

		"T2/B1:a-B7:g",
		"T2,8/B1:a-B7:g-B9:i",

		"T2/B1:a-B2:b", "T/B1:a,2:b",
		"T2,3/B1:a-B2:b-B3:c", "T/B1:a,2:b",
		"T2,3/B1:a-B2:c-B3:c", "T/B1:a,2:b",

		"T2/B1:a-B2:c", "T2,3/B1:a-B2:c-B3:c",

		"T2/B1:a-B3:c",
		Δ("T2/T-T4/B1:b-B3:d-B99:h",
			A{1:  K(1),
			  3:  K(3,99,oo),
			  99: K(3,99,oo),
			  oo: K(3,99,oo)}),
	}
	// direct  tree_i -> tree_{i+1} -> _{i+2} ...   plus
	// reverse ... tree_i <- _{i+1} <- _{i+2}
	kadjOK := ΔBTest(testv[len(testv)-1]).kadjOK
	for i := len(testv)-2; i >= 0; i-- {
		test := ΔBTest(testv[i])
		kadjOK, test.kadjOK = test.kadjOK, kadjOK
		testv = append(testv, test)
	}

	testq := make(chan ΔBTestEntry)
	go func() {
		defer close(testq)
		for _, test := range testv {
			testq <- ΔBTest(test)
		}
	}()
	testΔBTail(t, testq)

}

// TestΔBTailAllStructs verifies ΔBtail on tree topologies generated by AllStructs.
var (
	verylongFlag = flag.Bool("verylong", false, `switch tests to run in "very long" mode`)
	randseedFlag = flag.Int64("randseed", -1, `seed for random number generator`)
)
func TestΔBTailAllStructs(t *testing.T) {
	X := exc.Raiseif

	// considerations:
	// - maxdepth↑ better for testing (more tricky topologies)
	// - maxsplit↑ not so better for testing (leave s=1, max s=2)
	// - |kmin - kmax| affects N(variants) significantly
	//   -> keep key range small  (dumb increase does not help testing)
	// - N(keys) affects N(variants) significantly
	//   -> keep Nkeys reasonably small/medium  (dumb increase does not help testing)
	//
	// - spawning python subprocess is very slow (takes 300-500ms for
	//   imports; https://github.com/pypa/setuptools/issues/510)
	//   -> we spawn `treegen allstructs` once and use request/response approach.

	N := func(short, medium, long int) int {
		// -short
		if testing.Short() {
			return short
		}
		// -verylong
		if *verylongFlag {
			return long
		}
		// default
		return medium
	}
	maxdepth := N(2,  3,  4)
	maxsplit := N(1,  2,  2)
	n        := N(10,10,100)
	nkeys    := N(3,  5, 10)

	// server to generate AllStructs(kv, ...)
	sg, err := StartAllStructsSrv(); X(err)
	defer func() {
		err := sg.Close(); X(err)
	}()

	// random seed
	seed := *randseedFlag
	if seed == -1 {
		seed = time.Now().UnixNano()
	}
	rng := rand.New(rand.NewSource(seed))
	t.Logf("# maxdepth=%d maxsplit=%d nkeys=%d n=%d seed=%d", maxdepth, maxsplit, nkeys, n, seed)

	// generate (kv1, kv2, kv3) randomly

	// keysv1, keysv2 and keysv3 are random shuffle of IntSets
	var keysv1 [][]int
	var keysv2 [][]int
	var keysv3 [][]int
	for keys := range IntSets(nkeys) {
		keysv1 = append(keysv1, keys)
		keysv2 = append(keysv2, keys)
		keysv3 = append(keysv3, keys)
	}
	v := keysv1
	rng.Shuffle(len(v), func(i,j int) { v[i], v[j] = v[j], v[i] })
	v = keysv2
	rng.Shuffle(len(v), func(i,j int) { v[i], v[j] = v[j], v[i] })
	v = keysv3
	rng.Shuffle(len(v), func(i,j int) { v[i], v[j] = v[j], v[i] })

	// given random (kv1, kv2, kv3) generate corresponding set of random tree
	// topology sets (T1, T2, T3). Then iterate through T1->T2->T3->T1...
	// elements such that all right-directed triplets are visited and only once.
	// Test Update and rebuild on the generated tree sequences.
	vv := "abcdefgh"
	randv := func() string {
		i := rng.Intn(len(vv))
		return vv[i:i+1]
	}

	// the number of pairs    is 3·n^2
	// the number of triplets is   n^3
	//
	// limit n for emitted triplets, so that the amount of work for Update
	// and rebuild tests is approximately of the same order.
	nrebuild := int(math.Ceil(math.Pow(3*float64(n*n), 1./3)))
	// in non-short mode rebuild tests are exercising more keys variants, plus every test case
	// takes more time. Compensate for that as well.
	if !testing.Short() {
		nrebuild -= 3
	}

	testq := make(chan ΔBTestEntry)
	go func() {
		defer close(testq)
		for i := range keysv1 {
			keys1 := keysv1[i]
			keys2 := keysv2[i]
			keys3 := keysv3[i]

			kv1 := map[Key]string{}
			kv2 := map[Key]string{}
			kv3 := map[Key]string{}
			for _, k := range keys1 { kv1[Key(k)] = randv() }
			for _, k := range keys2 { kv2[Key(k)] = randv() }
			for _, k := range keys3 { kv3[Key(k)] = randv() }

			treev1, err1 := sg.AllStructs(kv1, maxdepth, maxsplit, n, rng.Int63())
			treev2, err2 := sg.AllStructs(kv2, maxdepth, maxsplit, n, rng.Int63())
			treev3, err3 := sg.AllStructs(kv3, maxdepth, maxsplit, n, rng.Int63())
			err := xerr.Merge(err1, err2, err3)
			if err != nil {
				t.Fatal(err)
			}

			emit := func(tree string, flags ΔBTestFlags) {
				// skip emitting this entry if both Update and
				// Rebuild are requested to be skipped.
				if flags == (ΔBTest_SkipUpdate | ΔBTest_SkipRebuild) {
					return
				}
				testq <- ΔBTestEntry{tree, nil, flags}
			}

			URSkipIf := func(ucond, rcond bool) ΔBTestFlags {
				var flags ΔBTestFlags
				if ucond {
					flags |= ΔBTest_SkipUpdate
				}
				if rcond {
					flags |= ΔBTest_SkipRebuild
				}
				return flags
			}

			for j := range treev1 {
				for k := range treev2 {
					for l := range treev3 {
						// limit rebuild to subset of tree topologies,
						// because #(triplets) grow as n^3. See nrebuild
						// definition above for details.
						norebuild := (j >= nrebuild ||
						              k >= nrebuild ||
							      l >= nrebuild)

						// C_{l-1} -> Aj   (pair first seen on k=0)
						emit(treev1[j], URSkipIf(k != 0, norebuild))

						// Aj -> Bk   (pair first seen on l=0)
						emit(treev2[k], URSkipIf(l != 0, norebuild))

						// Bk -> Cl   (pair first seen on j=0)
						emit(treev3[l], URSkipIf(j != 0, norebuild))
					}
				}
			}
		}
	}()

	testΔBTail(t, testq)
}


func TestΔBtailForget(t_ *testing.T) {
	t := tNewTreeEnv(t_)
	X := exc.Raiseif

	t0 := t.CommitTree("T/B:")
	t1 := t.CommitTree("T/B1:a")
	t2 := t.CommitTree("T2/B1:a-B2:b")
	t3 := t.CommitTree("T/B2:b")

	δbtail := NewΔBtail(t0.at, t.db)
	_, err := δbtail.Update(t1.δZ);  X(err)
	_, err  = δbtail.Update(t2.δZ);  X(err)

	// start tracking. everything becomes tracked because t1's T/B1:a has [-∞,∞) coverage
	// By starting tracking after t2 we verify vδBroots update in both Update and rebuild
	_0 := setKey{}; _0.Add(0)
	xtrackKeys(δbtail, t2, _0)

	_, err  = δbtail.Update(t3.δZ);  X(err)

	xat := map[zodb.Tid]string{
		t0.at: "at0",
		t1.at: "at1",
		t2.at: "at2",
		t3.at: "at3",
	}
	assertΔTtail(t.T, "init",         δbtail, t3, t.Root(), xat, t1.δxkv, t2.δxkv, t3.δxkv)
	δbtail.ForgetPast(t0.at)
	assertΔTtail(t.T, "forget ≤ at0", δbtail, t3, t.Root(), xat, t1.δxkv, t2.δxkv, t3.δxkv)
	δbtail.ForgetPast(t1.at)
	assertΔTtail(t.T, "forget ≤ at1", δbtail, t3, t.Root(), xat,          t2.δxkv, t3.δxkv)
	δbtail.ForgetPast(t3.at)
	assertΔTtail(t.T, "forget ≤ at3", δbtail, t3, t.Root(), xat,                          )
}


// ---- misc ----

func TestΔBtailClone(t_ *testing.T) {
	// ΔBtail.Clone had bug that aliased klon data to orig
	t := tNewTreeEnv(t_)
	X := exc.Raiseif

	t0 := t.CommitTree("T2/B1:a-B2:b")
	t1 := t.CommitTree("T2/B1:c-B2:d")
	δbtail := NewΔBtail(t0.at, t.db)
	_, err := δbtail.Update(t1.δZ); X(err)
	_2 := setKey{}; _2.Add(2)
	xtrackKeys(δbtail, t1, _2)
	err = δbtail.rebuildAll(); X(err)

	xat := map[zodb.Tid]string{
		t0.at: "at0",
		t1.at: "at1",
	}

	δkv1_1 := map[Key]Δstring{2:{"b","d"}}
	assertΔTtail(t.T, "orig @at1", δbtail, t1, t.Root(), xat, δkv1_1)
	δbklon := δbtail.Clone()
	assertΔTtail(t.T, "klon @at1", δbklon, t1, t.Root(), xat, δkv1_1)

	t2 := t.CommitTree("T/B1:b,2:a")
	_, err = δbtail.Update(t2.δZ); X(err)
	xat[t2.at] = "at2"

	δkv1_2 := map[Key]Δstring{1:{"a","c"}, 2:{"b","d"}}
	δkv2_2 := map[Key]Δstring{1:{"c","b"}, 2:{"d","a"}}
	assertΔTtail(t.T, "orig @at2", δbtail, t2, t.Root(), xat, δkv1_2, δkv2_2)
	assertΔTtail(t.T, "klon @at1 after orig @at->@at2", δbklon, t1, t.Root(), xat, δkv1_1)
}


// IntSets generates all sets of integers in range [0,N)
func IntSets(N int) chan []int {
	ch := make(chan []int)
	go intSets(ch, 0, N)
	return ch
}

// intSets generates all sets of integers in range [lo,hi)
func intSets(ch chan []int, lo, hi int) {
	ch <- nil // ø

	if lo < hi {
		for i := lo; i < hi; i++ {
			chTail := make(chan []int)
			go intSets(chTail, i+1, hi)
			for tail := range chTail {
				ch <- append([]int{i}, tail...) // i + tail
			}
		}
	}

	close(ch)
}

func TestIntSets(t *testing.T) {
	got := [][]int{}
	for is := range IntSets(3) {
		got = append(got, is)
	}

	I := func(v ...int) []int { return v }
	want := [][]int{I(),
			I(0), I(0,1), I(0,1,2), I(0,2),
			I(1), I(1,2),
			I(2),
	}

	if !reflect.DeepEqual(got, want) {
		t.Fatalf("error:\ngot:  %v\nwant: %v", got, want)
	}
}


// allTestKeys returns all keys from vt + ∞.
func allTestKeys(vt ...*tTreeCommit) setKey {
	allKeys := setKey{}; allKeys.Add(KeyMax) // ∞ simulating ZBigFile.Size() query
	for _, t := range vt {
		for _, b := range t.xkv {
			for k := range b.kv {
				allKeys.Add(k)
			}
		}
	}
	return allKeys
}

func sortedKeys(kv map[Key]Δstring) []Key {
	keyv := []Key{}
	for k := range kv {
		keyv = append(keyv, k)
	}
	sort.Slice(keyv, func(i, j int) bool {
		return keyv[i] < keyv[j]
	})
	return keyv
}

// XXX place
func tidvEqual(av, bv []zodb.Tid) bool {
	if len(av) != len(bv) {
		return false
	}
	for i, a := range av {
		if bv[i] != a {
			return false
		}
	}
	return true
}

func vδTEqual(vδa, vδb []map[Key]Δstring) bool {
	if len(vδa) != len(vδb) {
		return false
	}
	for i, δa := range vδa {
		if !δTEqual(δa, vδb[i]) {
			return false
		}
	}
	return true
}

func δTEqual(δa, δb map[Key]Δstring) bool {
	if len(δa) != len(δb) {
		return false
	}
	for k, δ := range δa {
		δ_, ok := δb[k]
		if !ok || δ != δ_ {
			return false
		}
	}
	return true
}

// ----------------------------------------

// ZBlk-related functions are imported at runtime by δbtail_x_test
var (
	ZTreeGetBlkData func(context.Context, *Tree, Key) (string, bool, []Node, error)
	ZGetBlkData     func(context.Context, *zodb.Connection, zodb.Oid) (string, error)
)

// xzgetBlkData loads block data from ZBlk object specified by its oid.
func xzgetBlkData(ctx context.Context, zconn *zodb.Connection, zblkOid zodb.Oid) string {
	X := exc.Raiseif

	if zblkOid == VDEL {
		return DEL
	}

	data, err := ZGetBlkData(ctx, zconn, zblkOid); X(err)
	return string(data)
}

// xzgetBlkDataAt loads block data from ZBlk object specified by oid@at.
func xzgetBlkDataAt(db *zodb.DB, zblkOid zodb.Oid, at zodb.Tid) string {
	X := exc.Raiseif

	txn, ctx := transaction.New(context.Background())
	defer txn.Abort()

	zconn, err := db.Open(ctx, &zodb.ConnOptions{At: at});  X(err)
	return xzgetBlkData(ctx, zconn, zblkOid)
}