Commit 9bf88606 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 3e3419f0
...@@ -433,7 +433,7 @@ func (δBtail *ΔBtail) track(path []zodb.Oid, keycov KeyRange) { ...@@ -433,7 +433,7 @@ func (δBtail *ΔBtail) track(path []zodb.Oid, keycov KeyRange) {
// //
// vδT is rebuilt if there are such not-yet-handled Track requests. // vδT is rebuilt if there are such not-yet-handled Track requests.
func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []ΔTree, err error) { func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []ΔTree, err error) {
δBtail.mu.Lock() δBtail.mu.Lock() // TODO verify that there is no in-progress writers
δTtail := δBtail.byRoot[root] // must be there δTtail := δBtail.byRoot[root] // must be there
if δTtail == nil { if δTtail == nil {
δBtail.mu.Unlock() δBtail.mu.Unlock()
...@@ -479,7 +479,7 @@ func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []Δ ...@@ -479,7 +479,7 @@ func (δBtail *ΔBtail) vδTSnapForTrackedKey(root zodb.Oid, key Key) (vδT []Δ
// //
// vδT is rebuilt if there are such not-yet-handled Track requests. // vδT is rebuilt if there are such not-yet-handled Track requests.
func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err error) { func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err error) {
δBtail.mu.Lock() δBtail.mu.Lock() // TODO verify that there is no in-progress writers
δTtail := δBtail.byRoot[root] // must be there δTtail := δBtail.byRoot[root] // must be there
if δTtail == nil { if δTtail == nil {
δBtail.mu.Unlock() δBtail.mu.Unlock()
...@@ -530,6 +530,9 @@ func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err er ...@@ -530,6 +530,9 @@ func (δBtail *ΔBtail) vδTSnapForTracked(root zodb.Oid) (vδT []ΔTree, err er
// //
// XXX naming -> _rebuild1 ? // XXX naming -> _rebuild1 ?
func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err error) { func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err error) {
return δTtail.__runRebuildJob(root, δBtail, /*releaseLock=*/true)
}
func (δTtail *_ΔTtail) __runRebuildJob(root zodb.Oid, δBtail *ΔBtail, releaseLock bool) (err error) {
// XXX errctx // XXX errctx
job := &_RebuildJob{ready: make(chan struct{})} job := &_RebuildJob{ready: make(chan struct{})}
...@@ -546,9 +549,13 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er ...@@ -546,9 +549,13 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er
delete(δBtail.trackNewRoots, root) delete(δBtail.trackNewRoots, root)
// build δ(vδT) without the lock // build δ(vδT) without the lock
δBtail.mu.Unlock() if releaseLock {
δBtail.mu.Unlock()
}
vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db) vδTnew, δtrackSet, err := vδTBuild(root, trackNew, δBtail.δZtail, δBtail.db)
δBtail.mu.Lock() if releaseLock {
δBtail.mu.Lock()
}
// krebuildJobs -= ktrackNew // krebuildJobs -= ktrackNew
for _, r := range ktrackNew.AllRanges() { for _, r := range ktrackNew.AllRanges() {
...@@ -588,7 +595,9 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er ...@@ -588,7 +595,9 @@ func (δTtail *_ΔTtail) _runRebuildJob(root zodb.Oid, δBtail *ΔBtail) (err er
// //
// TODO optionally accept zconnOld/zconnNew from client // TODO optionally accept zconnOld/zconnNew from client
func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) { func (δBtail *ΔBtail) Update(δZ *zodb.EventCommit) (_ ΔB, err error) {
// XXX locking δBtail.mu.Lock()
defer δBtail.mu.Unlock()
// TODO verify that there is no in-progress readers/writers
headOld := δBtail.Head() headOld := δBtail.Head()
defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid) defer xerr.Contextf(&err, "ΔBtail.Update %s -> %s", headOld, δZ.Tid)
...@@ -670,14 +679,14 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e ...@@ -670,14 +679,14 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e
δB1 = _ΔBUpdate1{ByRoot: make(map[zodb.Oid]*_ΔTUpdate1)} δB1 = _ΔBUpdate1{ByRoot: make(map[zodb.Oid]*_ΔTUpdate1)}
// update .trackSet and vδB from .trackNew // update .trackSet and vδB from .trackNew
err = δBtail.rebuildAll() err = δBtail._rebuildAll()
if err != nil { if err != nil {
return δB1, err return δB1, err
} }
δBtail.δZtail.Append(δZ.Tid, δZ.Changev) δBtail.δZtail.Append(δZ.Tid, δZ.Changev)
// NOTE: keep vvv in sync with vδTRebuild1 // NOTE: keep vvv in sync with vδTBuild1
δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, δBtail.trackSet) δZTC, δtopsByRoot := δZConnectTracked(δZ.Changev, δBtail.trackSet)
...@@ -722,18 +731,14 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e ...@@ -722,18 +731,14 @@ func (δBtail *ΔBtail) _Update1(δZ *zodb.EventCommit) (δB1 _ΔBUpdate1, err e
return δB1, nil return δB1, nil
} }
// rebuildAll rebuilds ΔBtail taking all trackNew requests into account. // _rebuildAll rebuilds ΔBtail taking all trackNew requests into account.
func (δBtail *ΔBtail) rebuildAll() (err error) { func (δBtail *ΔBtail) _rebuildAll() (err error) {
defer xerr.Context(&err, "ΔBtail rebuildAll") defer xerr.Context(&err, "ΔBtail _rebuildAll")
// XXX locking correct?
δBtail.mu.Lock()
defer δBtail.mu.Unlock()
tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), δBtail.trackNewRoots) tracefΔBtail("\nRebuildAll @%s..@%s trackNewRoots: %s\n", δBtail.Tail(), δBtail.Head(), δBtail.trackNewRoots)
for root := range δBtail.trackNewRoots { for root := range δBtail.trackNewRoots {
δTtail := δBtail.byRoot[root] // must be there δTtail := δBtail.byRoot[root] // must be there
err = δTtail._runRebuildJob(root, δBtail) err = δTtail.__runRebuildJob(root, δBtail, /*releaseLock=*/false)
if err != nil { if err != nil {
return err return err
} }
...@@ -771,7 +776,9 @@ func (δBtail *ΔBtail) _vδBroots_Update1(root zodb.Oid, rev zodb.Tid) { ...@@ -771,7 +776,9 @@ func (δBtail *ΔBtail) _vδBroots_Update1(root zodb.Oid, rev zodb.Tid) {
// ForgetPast forgets history entries with revision ≤ revCut. // ForgetPast forgets history entries with revision ≤ revCut.
func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) { func (δBtail *ΔBtail) ForgetPast(revCut zodb.Tid) {
// XXX locking δBtail.mu.Lock()
defer δBtail.mu.Unlock()
// TODO verify that there is no in-progress readers/writers
δBtail.δZtail.ForgetPast(revCut) δBtail.δZtail.ForgetPast(revCut)
......
...@@ -1055,7 +1055,7 @@ func xverifyΔBTail_rebuild_TR(t *testing.T, δbtail *ΔBtail, tj *xbtreetest.Co ...@@ -1055,7 +1055,7 @@ func xverifyΔBTail_rebuild_TR(t *testing.T, δbtail *ΔBtail, tj *xbtreetest.Co
subj := fmt.Sprintf("@%s: after Track%v", tj.AtSymb(), keys) subj := fmt.Sprintf("@%s: after Track%v", tj.AtSymb(), keys)
δbtail.assertTrack(t, subj, trackSet, trackNew) δbtail.assertTrack(t, subj, trackSet, trackNew)
δbtail.rebuildAll() δbtail._rebuildAll()
subj += " + rebuild" subj += " + rebuild"
δbtail.assertTrack(t, subj, trackSetAfterRebuild, ø) δbtail.assertTrack(t, subj, trackSetAfterRebuild, ø)
...@@ -1209,7 +1209,7 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) { ...@@ -1209,7 +1209,7 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
// track 2 + rebuild. // track 2 + rebuild.
_2 := setKey{}; _2.Add(2) _2 := setKey{}; _2.Add(2)
trackKeys(δbtail, t2, _2) trackKeys(δbtail, t2, _2)
err = δbtail.rebuildAll(); X(err) err = δbtail._rebuildAll(); X(err)
δttail := δbtail.byRoot[t.Root()] δttail := δbtail.byRoot[t.Root()]
...@@ -1276,7 +1276,7 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) { ...@@ -1276,7 +1276,7 @@ func TestΔBtailSliceByRootRev(t_ *testing.T) {
// after track 1 + rebuild old slices remain unchanged, but new queries return updated data // after track 1 + rebuild old slices remain unchanged, but new queries return updated data
_1 := setKey{}; _1.Add(1) _1 := setKey{}; _1.Add(1)
trackKeys(δbtail, t2, _1) trackKeys(δbtail, t2, _1)
err = δbtail.rebuildAll(); X(err) err = δbtail._rebuildAll(); X(err)
s00_ := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At) s00_ := δbtail.SliceByRootRev(t.Root(), t0.At, t0.At)
s01_ := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At) s01_ := δbtail.SliceByRootRev(t.Root(), t0.At, t1.At)
...@@ -1341,7 +1341,7 @@ func TestΔBtailClone(t_ *testing.T) { ...@@ -1341,7 +1341,7 @@ func TestΔBtailClone(t_ *testing.T) {
_, err := δbtail.Update(t1.ΔZ); X(err) _, err := δbtail.Update(t1.ΔZ); X(err)
_2 := setKey{}; _2.Add(2) _2 := setKey{}; _2.Add(2)
trackKeys(δbtail, t1, _2) trackKeys(δbtail, t1, _2)
err = δbtail.rebuildAll(); X(err) err = δbtail._rebuildAll(); X(err)
δkv1_1 := map[Key]Δstring{2:{"b","d"}} δkv1_1 := map[Key]Δstring{2:{"b","d"}}
assertΔTtail(t.T, "orig @at1", δbtail, t1, t.Root(), δkv1_1) assertΔTtail(t.T, "orig @at1", δbtail, t1, t.Root(), δkv1_1)
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment