Commit 55fdc2c1 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 54b623ba
...@@ -247,9 +247,9 @@ type _ΔTtail struct { ...@@ -247,9 +247,9 @@ type _ΔTtail struct {
vδT []ΔTree vδT []ΔTree
// set of keys that were requested to be tracked in this tree, // set of keys that were requested to be tracked in this tree,
// but for which vδT rebuild was not yet started // but for which vδT rebuild was not yet started as of @head
ktrackNew blib.RangedKeySet // {keycov} ktrackNew blib.RangedKeySet // {keycov}
// set of nodes corresponding to ktrackNew // set of nodes corresponding to ktrackNew as of @head
trackNew blib.PPTreeSubSet // PP{nodes} trackNew blib.PPTreeSubSet // PP{nodes}
// set of keys(nodes) for which rebuild is in progress // set of keys(nodes) for which rebuild is in progress
......
...@@ -58,7 +58,10 @@ package zdata ...@@ -58,7 +58,10 @@ package zdata
// tracked and is thus easy to maintain. It also can be maintained only in // tracked and is thus easy to maintain. It also can be maintained only in
// ΔFtail because ΔBtail and ΔZtail does not "know" anything about ZBigFile. // ΔFtail because ΔBtail and ΔZtail does not "know" anything about ZBigFile.
// //
// XXX concurrency //
// Concurrency
//
// XXX
import ( import (
"context" "context"
...@@ -116,12 +119,26 @@ type setOid = set.Oid ...@@ -116,12 +119,26 @@ type setOid = set.Oid
// .rev↑ // .rev↑
// {}blk | EPOCH // {}blk | EPOCH
// //
// XXX concurrent use.
//
// See also zodb.ΔTail and xbtree.ΔBtail // See also zodb.ΔTail and xbtree.ΔBtail
//
//
// Concurrency
//
// ΔFtail is safe to use in single-writer / multiple-readers mode. That is at
// any time there should be either only sole writer, or, potentially several
// simultaneous readers. The table below classifies operations:
//
// Writers: Update, ForgetPast
// Readers: Track + all queries (SliceByRev, SliceByFileRev, BlkRevAt)
//
// Note that, in particular, it is correct to run multiple Track and queries
// requests simultaneously.
type ΔFtail struct { type ΔFtail struct {
// ΔFtail merges ΔBtail with history of ZBlk // ΔFtail merges ΔBtail with history of ZBlk
δBtail *xbtree.ΔBtail δBtail *xbtree.ΔBtail
// XXX mu sync.Mutex + comment
byFile map[zodb.Oid]*_ΔFileTail // file -> vδf tail byFile map[zodb.Oid]*_ΔFileTail // file -> vδf tail
filesByRoot map[zodb.Oid]setOid // tree-root -> {} ZBigFile<oid> as of @head filesByRoot map[zodb.Oid]setOid // tree-root -> {} ZBigFile<oid> as of @head
...@@ -205,8 +222,6 @@ func (δFtail *ΔFtail) Tail() zodb.Tid { return δFtail.δBtail.Tail() } ...@@ -205,8 +222,6 @@ func (δFtail *ΔFtail) Tail() zodb.Tid { return δFtail.δBtail.Tail() }
// //
// Objects in path and zblk must be with .PJar().At() == .head // Objects in path and zblk must be with .PJar().At() == .head
func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, blkcov btree.LKeyRange, zblk ZBlk) { func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, blkcov btree.LKeyRange, zblk ZBlk) {
// XXX locking
head := δFtail.Head() head := δFtail.Head()
fileAt := file.PJar().At() fileAt := file.PJar().At()
...@@ -226,6 +241,10 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, bl ...@@ -226,6 +241,10 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, bl
rootObj := path[0].(*btree.LOBTree) rootObj := path[0].(*btree.LOBTree)
root := rootObj.POid() root := rootObj.POid()
δFtail.mu.Lock()
defer δFtail.mu.Unlock()
files, ok := δFtail.filesByRoot[root] files, ok := δFtail.filesByRoot[root]
if !ok { if !ok {
files = setOid{} files = setOid{}
...@@ -266,6 +285,8 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, bl ...@@ -266,6 +285,8 @@ func (δFtail *ΔFtail) Track(file *ZBigFile, blk int64, path []btree.LONode, bl
} }
} }
// XXX vδESnapForFile
// rebuildAll rebuilds vδE for all files from trackNew requests. // rebuildAll rebuilds vδE for all files from trackNew requests.
func (δFtail *ΔFtail) rebuildAll() (err error) { func (δFtail *ΔFtail) rebuildAll() (err error) {
defer xerr.Contextf(&err, "ΔFtail rebuildAll") defer xerr.Contextf(&err, "ΔFtail rebuildAll")
...@@ -277,7 +298,7 @@ func (δFtail *ΔFtail) rebuildAll() (err error) { ...@@ -277,7 +298,7 @@ func (δFtail *ΔFtail) rebuildAll() (err error) {
for foid := range δFtail.trackNew { for foid := range δFtail.trackNew {
δFtail.trackNew.Del(foid) δFtail.trackNew.Del(foid)
δftail := δFtail.byFile[foid] δftail := δFtail.byFile[foid]
err := δftail.rebuild1(foid, δZtail, db) err := δftail._rebuild1(foid, δZtail, db)
if err != nil { if err != nil {
return err return err
} }
...@@ -288,27 +309,32 @@ func (δFtail *ΔFtail) rebuildAll() (err error) { ...@@ -288,27 +309,32 @@ func (δFtail *ΔFtail) rebuildAll() (err error) {
// rebuild1IfNeeded rebuilds vδE if there is such need. // rebuild1IfNeeded rebuilds vδE if there is such need.
// //
// it returns corresponding δftail for convenience. // it returns corresponding δftail for convenience. XXX
// the only case when vδE actually needs to be rebuilt is when the file just started to be tracked. // the only case when vδE actually needs to be rebuilt is when the file just started to be tracked.
func (δFtail *ΔFtail) rebuild1IfNeeded(foid zodb.Oid) (_ *_ΔFileTail, err error) { func (δFtail *ΔFtail) rebuild1IfNeeded(foid zodb.Oid) (vδE []_ΔFileEpoch, headRoot zodb.Oid, err error) {
// XXX locking δFtail.mu.Lock() // TODO verify that there is no in-progress writers
defer δFtail.mu.Unlock()
δftail := δFtail.byFile[foid] δftail := δFtail.byFile[foid]
if δftail.vδE != nil { if δftail.vδE != nil {
err = nil err = nil
} else { } else {
//
δFtail.trackNew.Del(foid) δFtail.trackNew.Del(foid)
δBtail := δFtail.δBtail δBtail := δFtail.δBtail
err = δftail.rebuild1(foid, δBtail.ΔZtail(), δBtail.DB()) err = δftail._rebuild1(foid, δBtail.ΔZtail(), δBtail.DB())
} }
return δftail, err return δftail, err
} }
// rebuild1 rebuilds vδE. // _rebuild1 rebuilds vδE.
func (δftail *_ΔFileTail) rebuild1(foid zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (err error) { //
// XXX -> redo to just return built vδE (vδEBuild)
// XXX must be called with δftail.mu locked.
func (δftail *_ΔFileTail) _rebuild1(foid zodb.Oid, δZtail *zodb.ΔTail, db *zodb.DB) (err error) {
defer xerr.Contextf(&err, "file<%s>: rebuild", foid) defer xerr.Contextf(&err, "file<%s>: rebuild", foid)
// XXX locking // // XXX locking
if δftail.vδE != nil { if δftail.vδE != nil {
panic("rebuild1: vδE != nil") panic("rebuild1: vδE != nil")
} }
...@@ -363,6 +389,9 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) { ...@@ -363,6 +389,9 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
defer xerr.Contextf(&err, "ΔFtail update %s -> %s", δFtail.Head(), δZ.Tid) defer xerr.Contextf(&err, "ΔFtail update %s -> %s", δFtail.Head(), δZ.Tid)
// XXX locking // XXX locking
//δFtail.mu.Lock()
//defer δFtail.mu.Unlock()
//// TODO verify that there is no in-progress readers/writers
// rebuild vδE for newly tracked files // rebuild vδE for newly tracked files
err = δFtail.rebuildAll() err = δFtail.rebuildAll()
...@@ -411,7 +440,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) { ...@@ -411,7 +440,7 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
} }
δftail.root = δE.newRoot δftail.root = δE.newRoot
δftail.vδE = append(δftail.vδE, δE) δftail.vδE = append(δftail.vδE, δE) // XXX note that we do not change older snapshots
} }
} }
...@@ -536,9 +565,11 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) { ...@@ -536,9 +565,11 @@ func (δFtail *ΔFtail) Update(δZ *zodb.EventCommit) (_ ΔF, err error) {
// ForgetPast discards all δFtail entries with rev ≤ revCut. // ForgetPast discards all δFtail entries with rev ≤ revCut.
func (δFtail *ΔFtail) ForgetPast(revCut zodb.Tid) { func (δFtail *ΔFtail) ForgetPast(revCut zodb.Tid) {
δFtail.δBtail.ForgetPast(revCut) δFtail.mu.Lock()
defer δFtail.mu.Unlock()
// TODO verify that there is no in-progress readers/writers
// XXX locking δFtail.δBtail.ForgetPast(revCut)
// TODO keep index which file changed epoch where (similarly to ΔBtail), // TODO keep index which file changed epoch where (similarly to ΔBtail),
// and, instead of scanning all files, trim vδE only on files that is really necessary. // and, instead of scanning all files, trim vδE only on files that is really necessary.
...@@ -547,8 +578,7 @@ func (δFtail *ΔFtail) ForgetPast(revCut zodb.Tid) { ...@@ -547,8 +578,7 @@ func (δFtail *ΔFtail) ForgetPast(revCut zodb.Tid) {
} }
} }
func (δftail *_ΔFileTail) forgetPast(revCut zodb.Tid) { func (δftail *_ΔFileTail) _forgetPast(revCut zodb.Tid) {
// XXX locking
icut := 0 icut := 0
for ; icut < len(δftail.vδE); icut++ { for ; icut < len(δftail.vδE); icut++ {
if δftail.vδE[icut].Rev > revCut { if δftail.vδE[icut].Rev > revCut {
...@@ -583,8 +613,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -583,8 +613,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
//fmt.Printf("\nslice f<%s> (@%s,@%s]\n", foid, lo, hi) //fmt.Printf("\nslice f<%s> (@%s,@%s]\n", foid, lo, hi)
xtail.AssertSlice(δFtail, lo, hi) xtail.AssertSlice(δFtail, lo, hi)
// XXX locking
// query .δBtail.SliceByRootRev(file.blktab, lo, hi) + // query .δBtail.SliceByRootRev(file.blktab, lo, hi) +
// merge δZBlk history with that. // merge δZBlk history with that.
...@@ -602,7 +630,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -602,7 +630,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
// δFile ────────o───────o──────x─────x──────────────────────── // δFile ────────o───────o──────x─────x────────────────────────
δftail, err := δFtail.rebuild1IfNeeded(foid) vδE, headRoot, err := δFtail.rebuild1IfNeeded(foid)
if err != nil { if err != nil {
panic(err) // XXX panic(err) // XXX
} }
...@@ -630,7 +658,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -630,7 +658,6 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
iz := len(vδZ) - 1 iz := len(vδZ) - 1
// find epoch that covers hi // find epoch that covers hi
vδE := δftail.vδE
le := len(vδE) le := len(vδE)
ie := sort.Search(le, func(i int) bool { ie := sort.Search(le, func(i int) bool {
return hi < vδE[i].Rev return hi < vδE[i].Rev
...@@ -662,7 +689,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado ...@@ -662,7 +689,7 @@ func (δFtail *ΔFtail) SliceByFileRev(zfile *ZBigFile, lo, hi zodb.Tid) /*reado
var ZinblkAt zodb.Tid // Zinblk covers [ZinblkAt,<next δT>) var ZinblkAt zodb.Tid // Zinblk covers [ZinblkAt,<next δT>)
if ie+1 == le { if ie+1 == le {
// head // head
root = δftail.root root = headRoot
head = δFtail.Head() head = δFtail.Head()
for zblk, zt := range δFtail.trackSetZBlk { for zblk, zt := range δFtail.trackSetZBlk {
inblk, ok := zt.inroot[root] inblk, ok := zt.inroot[root]
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment