Commit 22aa3387 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 8c94bf6e
......@@ -1007,7 +1007,7 @@ func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value,
// Only tracked keys are guaranteed to be present.
//
// Note: contrary to regular go slicing, low is exclusive while high is inclusive.
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readonly*/vδT []ΔTree) {
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readonly*/vδT []ΔTree, err error) {
xtail.AssertSlice(δBtail, lo, hi)
if traceΔBtail {
......@@ -1018,22 +1018,22 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon
}
// retrieve vδT snapshot that is rebuilt to take all previous Track requests into account
vδT, err := δBtail.vδTSnapForTracked(root)
vδT, err = δBtail.vδTSnapForTracked(root)
if err != nil {
panic(err) // XXX
return nil, err
}
debugfΔBtail(" vδT: %v\n", vδT)
l := len(vδT)
if l == 0 {
return nil
return nil, nil
}
// find max j : [j].rev ≤ hi linear scan -> TODO binary search
j := l - 1
for ; j >= 0 && vδT[j].Rev > hi; j-- {}
if j < 0 {
return nil // ø
return nil, nil // ø
}
// find max i : [i].rev > lo linear scan -> TODO binary search
......@@ -1045,7 +1045,7 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon
// modified via RCU: i.e. _ΔTtail.rebuild clones vδT before modifying it.
// This way the data we return to caller will stay unchanged even if
// rebuild is running simultaneously.
return vδT[i:j+1]
return vδT[i:j+1], nil
}
......
......@@ -1246,11 +1246,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want)
}
s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
s12 := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At)
s22 := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At)
s00, err := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At); X(err)
s01, err := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At); X(err)
s02, err := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At); X(err)
s12, err := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At); X(err)
s22, err := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At); X(err)
vδT := δttail.vδT
assertvδT("t2.vδT", vδT, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}})
......@@ -1286,11 +1286,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
trackKeys(δbtail, t2, _1)
err = δbtail._rebuildAll(); X(err)
s00_ := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01_ := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
s02_ := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At)
s12_ := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At)
s22_ := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At)
s00_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At); X(err)
s01_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At); X(err)
s02_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At); X(err)
s12_, err := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At); X(err)
s22_, err := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At); X(err)
vδT = δttail.vδT
assertvδT("t12.vδT", vδT, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
......
......@@ -645,9 +645,17 @@ type _ZinblkOverlay struct {
// Only tracked blocks are guaranteed to be present.
//
// Note: contrary to regular go slicing, low is exclusive while high is inclusive.
func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*readonly*/[]*ΔFile {
func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) (/*readonly*/[]*ΔFile, error) {
foid := zfile.POid()
//fmt.Printf("\nslice f<%s> (@%s,@%s]\n", foid, lo, hi)
vδf, err := δFtail._SliceByFileRev(foid, lo, hi)
if err != nil {
err = fmt.Errorf("slice f<%s> (@%s,@%s]: %e", foid, lo, hi, err)
}
return vδf, err
}
func (δFtail *ΔFtail) _SliceByFileRev(foid zodb.Oid, lo, hi zodb.Tid) (/*readonly*/[]*ΔFile, error) {
xtail.AssertSlice(δFtail, lo, hi)
// query .δBtail.SliceByRootRev(file.blktab, lo, hi) +
......@@ -669,7 +677,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
vδE, headRoot, err := δFtail.vδEForFile(foid)
if err != nil {
panic(err) // XXX
return nil, err
}
var vδf []*ΔFile
......@@ -776,7 +784,10 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
// vδT for current epoch
var vδT []xbtree.ΔTree
if root != xbtree.VDEL {
vδT = δFtail.δBtail.SliceByRootRev(root, epoch, head) // NOTE @head, not hi
vδT, err = δFtail.δBtail.SliceByRootRev(root, epoch, head) // NOTE @head, not hi
if err != nil {
return nil, err
}
}
it := len(vδT) - 1
if it >= 0 {
......@@ -887,7 +898,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
vδf[i], vδf[j] = vδf[j], vδf[i]
}
return vδf
return vδf, nil
}
// ZinblkOverlay
......
......@@ -551,7 +551,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
hi := vδf[k].Rev
vδf_ok := vδf[j:k+1] // [j,k]
vδf_ := δFtail.SliceByFileRev(zfile, lo, hi)
vδf_, err := δFtail.SliceByFileRev(zfile, lo, hi); X(err)
if !reflect.DeepEqual(vδf_, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf_), t.vδfstr(vδf_ok))
}
......@@ -654,7 +654,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at1, at4] -> changes to both 0 and 1, because they both are changed in the same bucket @at2
lo := t1.At
hi := t4.At
vδf := δFtail.SliceByFileRev(zfile, lo, hi)
vδf, err := δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok := []*ΔFile{
&ΔFile{Rev: t2.At, Blocks: b(0,1), Size: true},
&ΔFile{Rev: t3.At, Blocks: b(0,1), Size: false},
......@@ -667,7 +667,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at2, at4] -> changes to only 0, because there is no change to 2 via blktab
lo = t2.At
vδf = δFtail.SliceByFileRev(zfile, lo, hi)
vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile{
&ΔFile{Rev: t3.At, Blocks: b(0), Size: false},
}
......@@ -677,7 +677,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at3, at4] -> changes to only 0, ----/----
lo = t3.At
vδf = δFtail.SliceByFileRev(zfile, lo, hi)
vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile(nil)
if !reflect.DeepEqual(vδf, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok))
......
......@@ -1742,7 +1742,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
toPin := map[int64]zodb.Tid{} // blk -> @rev
δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail
vδf, err := δFtail.SliceByFileRev(f.zfile, at, headAt) // XXX locking δFtail
if err != nil {
panic(err) // XXX
}
for _, δfile := range vδf {
if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment