Commit d85bb82c authored by Kirill Smelkov's avatar Kirill Smelkov

ΔFtail concurrency

See changes in δftail.go for overview.

* t2+ΔFtail-concurrency: (39 commits)
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  .
  X zdata: Switch SliceByFileRev not to clone Zinblk
  .
  .
  .
  .
  .
  ...
parents 54b623ba 3207c0ad
...@@ -137,31 +137,55 @@ func (t *T) Head() *Commit { ...@@ -137,31 +137,55 @@ func (t *T) Head() *Commit {
// XGetCommit finds and returns Commit created with revision at. // XGetCommit finds and returns Commit created with revision at.
func (t *T) XGetCommit(at zodb.Tid) *Commit { func (t *T) XGetCommit(at zodb.Tid) *Commit {
commit, _, _ := t.getCommit(at)
if commit == nil {
panicf("no commit corresponding to @%s", at)
}
return commit
}
func (t *T) getCommit(at zodb.Tid) (commit, cprev, cnext *Commit) {
l := len(t.commitv) l := len(t.commitv)
i := sort.Search(l, func(i int) bool { i := sort.Search(l, func(i int) bool {
return at <= t.commitv[i].At return at <= t.commitv[i].At
}) })
var commit *Commit
if i < l { if i < l {
commit = t.commitv[i] commit = t.commitv[i]
if commit.At != at { if commit.At != at {
cnext = commit
commit = nil commit = nil
} else if i+1 < l {
cnext = t.commitv[i+1]
} }
} }
if commit == nil { if i > 0 {
panicf("no commit corresponding to @%s", at) cprev = t.commitv[i-1]
} }
if commit.idx != i { if commit != nil && commit.idx != i {
panicf("BUG: commit.idx (%d) != i (%d)", commit.idx, i) panicf("BUG: commit.idx (%d) != i (%d)", commit.idx, i)
} }
return commit return commit, cprev, cnext
} }
// AtSymb returns symbolic representation of at, for example "at3". // AtSymb returns symbolic representation of at, for example "at3".
// //
// at must correspond to a Commit. // at should correspond to a Commit.
func (t *T) AtSymb(at zodb.Tid) string { func (t *T) AtSymb(at zodb.Tid) string {
return t.XGetCommit(at).AtSymb() commit, cprev, cnext := t.getCommit(at)
if commit != nil {
return commit.AtSymb()
}
// at does not correspond to commit - return something like ~at2<xxxx>at3
s := "~"
if cprev != nil {
s += cprev.AtSymb() + "<"
}
s += at.String()
if cnext != nil {
s += ">" + cnext.AtSymb()
}
return s
} }
// AtSymb returns symbolic representation of c.At, for example "at3". // AtSymb returns symbolic representation of c.At, for example "at3".
......
...@@ -69,7 +69,9 @@ package xbtree ...@@ -69,7 +69,9 @@ package xbtree
// Concurrency // Concurrency
// //
// In order to allow multiple Track and queries requests to be served in // In order to allow multiple Track and queries requests to be served in
// parallel ΔBtail employs special organization of vδT rebuild process: // parallel ΔBtail employs special organization of vδT rebuild process where
// complexity of concurrency is reduced to math on merging updates to vδT and
// trackSet, and on key range lookup:
// //
// 1. vδT is managed under read-copy-update (RCU) discipline: before making // 1. vδT is managed under read-copy-update (RCU) discipline: before making
// any vδT change the mutator atomically clones whole vδT and applies its // any vδT change the mutator atomically clones whole vδT and applies its
...@@ -117,13 +119,21 @@ package xbtree ...@@ -117,13 +119,21 @@ package xbtree
// //
// vδT/(T₁∪T₂) = vδT/T₁ | vδT/T₂ // vδT/(T₁∪T₂) = vδT/T₁ | vδT/T₂
// //
// i.e. vδT computed for tracked set being union of T₁ and T₂ is the same // ( i.e. vδT computed for tracked set being union of T₁ and T₂ is the
// as merge of vδT computed for tracked set T₁ and vδT computed for tracked // same as merge of vδT computed for tracked set T₁ and vδT computed
// set T₂. // for tracked set T₂ )
// //
// this merge property allows to run computation for δ(vδT) independently // and that
// and with ΔBtail unlocked, which in turn enables running several //
// Track/queries in parallel. // trackSet | (δPP₁|δPP₂) = (trackSet|δPP₁) | (trackSet|δPP₂)
//
// ( i.e. tracking set updated for union of δPP₁ and δPP₂ is the same
// as union of tracking set updated with δPP₁ and tracking set updated
// with δPP₂ )
//
// these merge properties allow to run computation for δ(vδT) and δ(trackSet)
// independently and with ΔBtail unlocked, which in turn enables running
// several Track/queries in parallel.
// //
// 4. while vδT rebuild is being run, krebuildJobs keeps corresponding keycov // 4. while vδT rebuild is being run, krebuildJobs keeps corresponding keycov
// entry to indicate in-progress rebuild. Should a query need vδT for keys // entry to indicate in-progress rebuild. Should a query need vδT for keys
...@@ -247,9 +257,9 @@ type _ΔTtail struct { ...@@ -247,9 +257,9 @@ type _ΔTtail struct {
vδT []ΔTree vδT []ΔTree
// set of keys that were requested to be tracked in this tree, // set of keys that were requested to be tracked in this tree,
// but for which vδT rebuild was not yet started // but for which vδT rebuild was not yet started as of @head
ktrackNew blib.RangedKeySet // {keycov} ktrackNew blib.RangedKeySet // {keycov}
// set of nodes corresponding to ktrackNew // set of nodes corresponding to ktrackNew as of @head
trackNew blib.PPTreeSubSet // PP{nodes} trackNew blib.PPTreeSubSet // PP{nodes}
// set of keys(nodes) for which rebuild is in progress // set of keys(nodes) for which rebuild is in progress
...@@ -672,13 +682,13 @@ func (δTtail *_ΔTtail) __rebuild(root zodb.Oid, δBtail *ΔBtail, releaseLock ...@@ -672,13 +682,13 @@ func (δTtail *_ΔTtail) __rebuild(root zodb.Oid, δBtail *ΔBtail, releaseLock
// //
// TODO optionally accept zconnOld/zconnNew from client // TODO optionally accept zconnOld/zconnNew from client
func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) { func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid)
δBtail.mu.Lock() δBtail.mu.Lock()
defer δBtail.mu.Unlock() defer δBtail.mu.Unlock()
// TODO verify that there is no in-progress readers/writers // TODO verify that there is no in-progress readers/writers
headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid)
δB1, err := δBtail._Update1(δZ) δB1, err := δBtail._Update1(δZ)
δB := ΔB{Rev: δZ.Tid, ByRoot: make(map[zodb.Oid]map[Key]ΔValue)} δB := ΔB{Rev: δZ.Tid, ByRoot: make(map[zodb.Oid]map[Key]ΔValue)}
...@@ -997,7 +1007,7 @@ func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value, ...@@ -997,7 +1007,7 @@ func (δBtail *ΔBtail) GetAt(root zodb.Oid, key Key, at zodb.Tid) (value Value,
// Only tracked keys are guaranteed to be present. // Only tracked keys are guaranteed to be present.
// //
// Note: contrary to regular go slicing, low is exclusive while high is inclusive. // Note: contrary to regular go slicing, low is exclusive while high is inclusive.
func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readonly*/vδT []ΔTree) { func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readonly*/vδT []ΔTree, err error) {
xtail.AssertSlice(δBtail, lo, hi) xtail.AssertSlice(δBtail, lo, hi)
if traceΔBtail { if traceΔBtail {
...@@ -1008,22 +1018,22 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon ...@@ -1008,22 +1018,22 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon
} }
// retrieve vδT snapshot that is rebuilt to take all previous Track requests into account // retrieve vδT snapshot that is rebuilt to take all previous Track requests into account
vδT, err := δBtail.vδTSnapForTracked(root) vδT, err = δBtail.vδTSnapForTracked(root)
if err != nil { if err != nil {
panic(err) // XXX return nil, err
} }
debugfΔBtail(" vδT: %v\n", vδT) debugfΔBtail(" vδT: %v\n", vδT)
l := len(vδT) l := len(vδT)
if l == 0 { if l == 0 {
return nil return nil, nil
} }
// find max j : [j].rev ≤ hi linear scan -> TODO binary search // find max j : [j].rev ≤ hi linear scan -> TODO binary search
j := l - 1 j := l - 1
for ; j >= 0 && vδT[j].Rev > hi; j-- {} for ; j >= 0 && vδT[j].Rev > hi; j-- {}
if j < 0 { if j < 0 {
return nil // ø return nil, nil // ø
} }
// find max i : [i].rev > lo linear scan -> TODO binary search // find max i : [i].rev > lo linear scan -> TODO binary search
...@@ -1035,7 +1045,7 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon ...@@ -1035,7 +1045,7 @@ func (δBtail *ΔBtail) SliceByRootRev(root zodb.Oid, lo, hi zodb.Tid) (/*readon
// modified via RCU: i.e. _ΔTtail.rebuild clones vδT before modifying it. // modified via RCU: i.e. _ΔTtail.rebuild clones vδT before modifying it.
// This way the data we return to caller will stay unchanged even if // This way the data we return to caller will stay unchanged even if
// rebuild is running simultaneously. // rebuild is running simultaneously.
return vδT[i:j+1] return vδT[i:j+1], nil
} }
......
...@@ -1246,11 +1246,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) { ...@@ -1246,11 +1246,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want) t.Errorf("%s:\nhave: %s\nwant: %s", subj, have, want)
} }
s00 := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At) s00, err := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At); X(err)
s01 := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At) s01, err := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At); X(err)
s02 := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At) s02, err := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At); X(err)
s12 := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At) s12, err := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At); X(err)
s22 := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At) s22, err := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At); X(err)
vδT := δttail.vδT vδT := δttail.vδT
assertvδT("t2.vδT", vδT, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}}) assertvδT("t2.vδT", vδT, ΔT{t1.At, δ{2:{f,g}}}, ΔT{t2.At, δ{2:{g,h}}})
...@@ -1286,11 +1286,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) { ...@@ -1286,11 +1286,11 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
trackKeys(δbtail, t2, _1) trackKeys(δbtail, t2, _1)
err = δbtail._rebuildAll(); X(err) err = δbtail._rebuildAll(); X(err)
s00_ := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At) s00_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At); X(err)
s01_ := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At) s01_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At); X(err)
s02_ := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At) s02_, err := δbtail.SliceByRootRev(t.Root(), t0.At, t2.At); X(err)
s12_ := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At) s12_, err := δbtail.SliceByRootRev(t.Root(), t1.At, t2.At); X(err)
s22_ := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At) s22_, err := δbtail.SliceByRootRev(t.Root(), t2.At, t2.At); X(err)
vδT = δttail.vδT vδT = δttail.vδT
assertvδT("t12.vδT", vδT, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}}) assertvδT("t12.vδT", vδT, ΔT{t1.At, δ{1:{a,b},2:{f,g}}}, ΔT{t2.At, δ{1:{b,c},2:{g,h}}})
......
This diff is collapsed.
...@@ -242,12 +242,12 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -242,12 +242,12 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
// update vδf + co for t1 // update vδf + co for t1
vδf = append(vδf, &ΔFile{Rev: t1.At, Epoch: true}) vδf = append(vδf, &ΔFile{Rev: t1.At, Epoch: true})
vδE = append(vδE, _ΔFileEpoch{ vδE = append(vδE, _ΔFileEpoch{
Rev: t1.At, Rev: t1.At,
oldRoot: zodb.InvalidOid, oldRoot: zodb.InvalidOid,
newRoot: t.Root(), newRoot: t.Root(),
oldBlkSize: -1, oldBlkSize: -1,
newBlkSize: blksize, newBlkSize: blksize,
oldTrackSetZBlk: nil, oldZinblk: nil,
}) })
epochv = append(epochv, t1.At) epochv = append(epochv, t1.At)
for blk, zblk := range δt1 { for blk, zblk := range δt1 {
...@@ -305,7 +305,11 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -305,7 +305,11 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
for blk, zblk := range test.δblkTab { for blk, zblk := range test.δblkTab {
zprev, ok := blkTab[blk] zprev, ok := blkTab[blk]
if ok { if ok {
delete(Zinblk[zprev], blk) inblk := Zinblk[zprev]
inblk.Del(blk)
if len(inblk) == 0 {
delete(Zinblk, zprev)
}
} else { } else {
zprev = ø zprev = ø
} }
...@@ -423,12 +427,12 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -423,12 +427,12 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
δE.oldBlkSize = -1 δE.oldBlkSize = -1
δE.newBlkSize = blksize δE.newBlkSize = blksize
} }
oldTrackSetZBlk := map[zodb.Oid]setI64{} oldZinblk := map[zodb.Oid]setI64{}
for zblk, inblk := range ZinblkPrev { for zblk, inblk := range ZinblkPrev {
oid, _ := commit.XGetBlkByName(zblk) oid, _ := commit.XGetBlkByName(zblk)
oldTrackSetZBlk[oid] = inblk oldZinblk[oid] = inblk
} }
δE.oldTrackSetZBlk = oldTrackSetZBlk δE.oldZinblk = oldZinblk
vδE = append(vδE, δE) vδE = append(vδE, δE)
} }
...@@ -445,26 +449,60 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -445,26 +449,60 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
retrackAll() retrackAll()
} }
// verify δFtail.trackSetZBlk // verify byRoot
trackZinblk := map[string]setI64{} trackRfiles := map[zodb.Oid]setOid{}
for oid, zt := range δFtail.trackSetZBlk { for root, rt := range δFtail.byRoot {
zblki := commit.ZBlkTab[oid] trackRfiles[root] = rt.ftrackSet
for root, blocks := range zt.inroot { }
if root != t.Root() { filesOK := setOid{}
t.Errorf(".trackSetZBlk: zblk %s points to unexpected blktab %s", zblki.Name, t.Root()) if !delfile {
continue filesOK.Add(foid)
} }
RfilesOK := map[zodb.Oid]setOid{}
if len(filesOK) != 0 {
RfilesOK[t.Root()] = filesOK
}
if !reflect.DeepEqual(trackRfiles, RfilesOK) {
t.Errorf("Rfiles:\nhave: %v\nwant: %v", trackRfiles, RfilesOK)
}
inblk, ok := trackZinblk[zblki.Name] // verify Zinroot
if !ok { trackZinroot := map[string]setOid{}
inblk = setI64{} for zoid, inroot := range δFtail.ztrackInRoot {
trackZinblk[zblki.Name] = inblk zblki := commit.ZBlkTab[zoid]
trackZinroot[zblki.Name] = inroot.Clone() // XXX clone needed?
}
Zinroot := map[string]setOid{}
for zblk := range Zinblk {
inroot := setOid{}; inroot.Add(t.Root())
Zinroot[zblk] = inroot
}
if !reflect.DeepEqual(trackZinroot, Zinroot) {
t.Errorf("Zinroot:\nhave: %v\nwant: %v", trackZinroot, Zinroot)
}
// verify Zinblk
trackZinblk := map[string]setI64{}
switch {
case len(δFtail.byRoot) == 0:
// ok
case len(δFtail.byRoot) == 1:
rt, ok := δFtail.byRoot[t.Root()]
if !ok {
t.Errorf(".byRoot points to unexpected blktab")
} else {
for zoid, inblk := range rt.ztrackInBlk {
zblki := commit.ZBlkTab[zoid]
trackZinblk[zblki.Name] = inblk.Clone() // XXX clone needed?
} }
inblk.Update(blocks)
} }
default:
t.Errorf("len(.byRoot) != (0,1) ; byRoot: %v", δFtail.byRoot)
} }
if !reflect.DeepEqual(trackZinblk, Zinblk) { if !reflect.DeepEqual(trackZinblk, Zinblk) {
t.Errorf(".trackSetZBlk:\n~have: %v\n want: %v", trackZinblk, Zinblk) t.Errorf("Zinblk:\nhave: %v\nwant: %v", trackZinblk, Zinblk)
} }
// ForgetPast configured threshold // ForgetPast configured threshold
...@@ -485,16 +523,6 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -485,16 +523,6 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
vδE = vδE[icut:] vδE = vδE[icut:]
} }
// verify δFtail.filesByRoot
filesByRootOK := map[zodb.Oid]setOid{}
if !delfile {
__ := setOid{}; __.Add(foid)
filesByRootOK[t.Root()] = __
}
if !reflect.DeepEqual(δFtail.filesByRoot, filesByRootOK) {
t.Errorf("filesByRoot:\nhave: %v\nwant: %v", δFtail.filesByRoot, filesByRootOK)
}
// verify δftail.root // verify δftail.root
δftail := δFtail.byFile[foid] δftail := δFtail.byFile[foid]
rootOK := t.Root() rootOK := t.Root()
...@@ -523,7 +551,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -523,7 +551,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
hi := vδf[k].Rev hi := vδf[k].Rev
vδf_ok := vδf[j:k+1] // [j,k] vδf_ok := vδf[j:k+1] // [j,k]
vδf_ := δFtail.SliceByFileRev(zfile, lo, hi) vδf_, err := δFtail.SliceByFileRev(zfile, lo, hi); X(err)
if !reflect.DeepEqual(vδf_, vδf_ok) { if !reflect.DeepEqual(vδf_, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf_), t.vδfstr(vδf_ok)) t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf_), t.vδfstr(vδf_ok))
} }
...@@ -547,7 +575,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) { ...@@ -547,7 +575,7 @@ func testΔFtail(t_ *testing.T, testq chan ΔFTestEntry) {
at := vδf[j].Rev at := vδf[j].Rev
blkRev := blkRevAt[at] blkRev := blkRevAt[at]
for _, blk := range blkv { for _, blk := range blkv {
rev, exact := δFtail.BlkRevAt(ctx, zfile, blk, at) rev, exact, err := δFtail.BlkRevAt(ctx, zfile, blk, at); X(err)
revOK, ok := blkRev[blk] revOK, ok := blkRev[blk]
if !ok { if !ok {
k := len(epochv) - 1 k := len(epochv) - 1
...@@ -626,7 +654,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) { ...@@ -626,7 +654,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at1, at4] -> changes to both 0 and 1, because they both are changed in the same bucket @at2 // (at1, at4] -> changes to both 0 and 1, because they both are changed in the same bucket @at2
lo := t1.At lo := t1.At
hi := t4.At hi := t4.At
vδf := δFtail.SliceByFileRev(zfile, lo, hi) vδf, err := δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok := []*ΔFile{ vδf_ok := []*ΔFile{
&ΔFile{Rev: t2.At, Blocks: b(0,1), Size: true}, &ΔFile{Rev: t2.At, Blocks: b(0,1), Size: true},
&ΔFile{Rev: t3.At, Blocks: b(0,1), Size: false}, &ΔFile{Rev: t3.At, Blocks: b(0,1), Size: false},
...@@ -639,7 +667,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) { ...@@ -639,7 +667,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at2, at4] -> changes to only 0, because there is no change to 2 via blktab // (at2, at4] -> changes to only 0, because there is no change to 2 via blktab
lo = t2.At lo = t2.At
vδf = δFtail.SliceByFileRev(zfile, lo, hi) vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile{ vδf_ok = []*ΔFile{
&ΔFile{Rev: t3.At, Blocks: b(0), Size: false}, &ΔFile{Rev: t3.At, Blocks: b(0), Size: false},
} }
...@@ -649,7 +677,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) { ...@@ -649,7 +677,7 @@ func TestΔFtailSliceUntrackedUniform(t_ *testing.T) {
// (at3, at4] -> changes to only 0, ----/---- // (at3, at4] -> changes to only 0, ----/----
lo = t3.At lo = t3.At
vδf = δFtail.SliceByFileRev(zfile, lo, hi) vδf, err = δFtail.SliceByFileRev(zfile, lo, hi); X(err)
vδf_ok = []*ΔFile(nil) vδf_ok = []*ΔFile(nil)
if !reflect.DeepEqual(vδf, vδf_ok) { if !reflect.DeepEqual(vδf, vδf_ok) {
t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok)) t.Errorf("slice (@%s,@%s]:\nhave: %v\nwant: %v", t.AtSymb(lo), t.AtSymb(hi), t.vδfstr(vδf), t.vδfstr(vδf_ok))
......
...@@ -1085,22 +1085,29 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) { ...@@ -1085,22 +1085,29 @@ func (f *BigFile) invalidateBlk(ctx context.Context, blk int64) (err error) {
// //
// if we have the data - preserve it under @revX/bigfile/file[blk]. // if we have the data - preserve it under @revX/bigfile/file[blk].
if int64(len(blkdata)) == blksize { if int64(len(blkdata)) == blksize {
func() { err := func() error {
// store retrieved data back to OS cache for file @<rev>/file[blk] // store retrieved data back to OS cache for file @<rev>/file[blk]
δFtail := f.head.bfdir.δFtail δFtail := f.head.bfdir.δFtail
blkrev, _ := δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At()) blkrev, _, err := δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
if err != nil {
return err
}
frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid()) frev, funlock, err := groot.lockRevFile(blkrev, f.zfile.POid())
if err != nil { if err != nil {
log.Errorf("BUG: %s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err) return fmt.Errorf("BUG: %s", err)
return
} }
defer funlock() defer funlock()
st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata) st := fsconn.FileNotifyStoreCache(frev.Inode(), off, blkdata)
if st != fuse.OK { if st != fuse.OK {
log.Errorf("BUG: %s: invalidate blk #%d: %s: store cache: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, frev.path(), st) return fmt.Errorf("BUG: %s: store cache: %s", frev.path(), st)
} }
return nil
}() }()
if err != nil {
log.Errorf("%s: invalidate blk #%d: %s (ignoring, but reading @revX/bigfile will be slow)", f.path(), blk, err)
}
} }
// invalidate file/head/data[blk] in OS file cache. // invalidate file/head/data[blk] in OS file cache.
...@@ -1566,7 +1573,11 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr ...@@ -1566,7 +1573,11 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// we'll relock atMu again and recheck blkrev vs w.at after. // we'll relock atMu again and recheck blkrev vs w.at after.
w.atMu.RUnlock() w.atMu.RUnlock()
blkrev, _ = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At()) var err error
blkrev, _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
if err != nil {
panic(err) // XXX
}
blkrevRough = false blkrevRough = false
w.atMu.RLock() w.atMu.RLock()
...@@ -1582,8 +1593,11 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr ...@@ -1582,8 +1593,11 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, treepath []btr
// and most of them would be on different w.at - cache of the file will // and most of them would be on different w.at - cache of the file will
// be lost. Via pinning to particular block revision, we make sure the // be lost. Via pinning to particular block revision, we make sure the
// revision to pin is the same on all clients, and so file cache is shared. // revision to pin is the same on all clients, and so file cache is shared.
pinrev, _ := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go? pinrev, _, err := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go?
// XXX ^^^ w.file vs f ? // XXX ^^^ w.file vs f ?
if err != nil {
panic(err) // XXX
}
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev) //fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
wg.Go(func(ctx context.Context) error { wg.Go(func(ctx context.Context) error {
...@@ -1728,7 +1742,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1728,7 +1742,11 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
toPin := map[int64]zodb.Tid{} // blk -> @rev toPin := map[int64]zodb.Tid{} // blk -> @rev
δFtail := bfdir.δFtail δFtail := bfdir.δFtail
for _, δfile := range δFtail.SliceByFileRev(f.zfile, at, headAt) { // XXX locking δFtail vδf, err := δFtail.SliceByFileRev(f.zfile, at, headAt) // XXX locking δFtail
if err != nil {
panic(err) // XXX
}
for _, δfile := range vδf {
if δfile.Epoch { if δfile.Epoch {
// file epochs are currently forbidden (see watcher), so the only // file epochs are currently forbidden (see watcher), so the only
// case when we could see an epoch here is creation of // case when we could see an epoch here is creation of
...@@ -1764,7 +1782,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T ...@@ -1764,7 +1782,10 @@ func (wlink *WatchLink) setupWatch(ctx context.Context, foid zodb.Oid, at zodb.T
continue continue
} }
toPin[blk], _ = δFtail.BlkRevAt(ctx, f.zfile, blk, at) // XXX err toPin[blk], _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, at)
if err != nil {
panic(err) // XXX
}
} }
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment