Commit 063a03af authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 38b20311
...@@ -1243,6 +1243,22 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1243,6 +1243,22 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
// noone was loading - we became responsible to load this block // noone was loading - we became responsible to load this block
blkdata, treepath, blkcov, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk) blkdata, treepath, blkcov, zblk, blkrevMax, err := f.zfile.LoadBlk(ctx, blk)
// head/ - update δFtail + pin watchers
if f.head.rev == 0 && err == nil {
// update δFtail index
// see "3) for */head/data the following invariant is maintained..."
δFtail := f.head.bfdir.δFtail
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk)
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see readPinWatchers comments)
err = f.readPinWatchers(ctx, blk, blkrevMax)
if err != nil {
blkdata = nil
}
}
loading.blkdata = blkdata loading.blkdata = blkdata
loading.err = err loading.err = err
...@@ -1255,17 +1271,6 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro ...@@ -1255,17 +1271,6 @@ func (f *BigFile) readBlk(ctx context.Context, blk int64, dest []byte) (err erro
return err return err
} }
if f.head.rev == 0 {
// update δFtail index
// see "3) for */head/data the following invariant is maintained..."
δFtail := f.head.bfdir.δFtail
δFtail.Track(f.zfile, blk, treepath, blkcov, zblk)
// we have the data - it can be used after watchers are updated
// XXX should we use ctx here? (see readPinWatchers comments)
f.readPinWatchers(ctx, blk, blkrevMax)
}
// data can be used now // data can be used now
close(loading.ready) close(loading.ready)
copy(dest, blkdata) // TODO avoid copy copy(dest, blkdata) // TODO avoid copy
...@@ -1483,7 +1488,9 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) { ...@@ -1483,7 +1488,9 @@ func (w *Watch) _pin(ctx context.Context, blk int64, rev zodb.Tid) (err error) {
// XXX do we really need to use/propagate caller context here? ideally update // XXX do we really need to use/propagate caller context here? ideally update
// watchers should be synchronous, and in practice we just use 30s timeout. // watchers should be synchronous, and in practice we just use 30s timeout.
// Should a READ interrupt cause watch update failure? -> probably no // Should a READ interrupt cause watch update failure? -> probably no
func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) { func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb.Tid) (err error) {
defer xerr.Context(&err, "pin watchers") // f.path and blk is already put into context by readBlk
// only head/ is being watched for // only head/ is being watched for
if f.head.rev != 0 { if f.head.rev != 0 {
panic("BUG: readPinWatchers: called for file under @revX/") panic("BUG: readPinWatchers: called for file under @revX/")
...@@ -1526,7 +1533,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb ...@@ -1526,7 +1533,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
var err error var err error
blkrev, _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At()) blkrev, _, err = δFtail.BlkRevAt(ctx, f.zfile, blk, f.head.zconn.At())
if err != nil { if err != nil {
panic(err) // XXX return err
} }
blkrevRough = false blkrevRough = false
...@@ -1546,7 +1553,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb ...@@ -1546,7 +1553,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
pinrev, _, err := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go? pinrev, _, err := δFtail.BlkRevAt(ctx, w.file.zfile, blk, w.at) // XXX move into go?
// XXX ^^^ w.file vs f ? // XXX ^^^ w.file vs f ?
if err != nil { if err != nil {
panic(err) // XXX return err
} }
//fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev) //fmt.Printf("S: read #%d: watch @%s: pin -> @%s\n", blk, w.at, pinrev)
...@@ -1558,10 +1565,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb ...@@ -1558,10 +1565,7 @@ func (f *BigFile) readPinWatchers(ctx context.Context, blk int64, blkrevMax zodb
} }
f.watchMu.RUnlock() f.watchMu.RUnlock()
err := wg.Wait() return wg.Wait()
if err != nil {
panic(err) // XXX
}
} }
// setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request. // setupWatch sets up or updates a Watch when client sends `watch <file> @<at>` request.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment