Commit 32eb3adf authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 559730a3
......@@ -313,6 +313,7 @@ type BigFile struct {
// TODO
// lastChange zodb.Tid // last change to whole bigfile as of .zconn.At view
}
// BigFileData represents "/bigfile/<bigfileX>/head/data"
......@@ -321,6 +322,21 @@ type BigFileData struct {
nodefs.Node
bigfile *BigFile
// inflight loadings from ZBigFile
loadMu sync.Mutex
loading map[int64]*blkLoadState
}
// blkLoadState represents a ZBlk load state/result.
//
// when !ready the loading is in progress.
// when ready the loading has been completed.
type blkLoadState struct {
ready chan struct{}
blkdata []byte
err error
}
......@@ -477,7 +493,7 @@ func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fus
ctx, cancel := xcontext.Merge(asctx(fctx), bf.txnCtx)
defer cancel()
// widen read request to be aligned with blksize granularity:
// widen read request to be aligned with blksize granularity
end := off + int64(len(dest)) // XXX overflow?
aoff := off - (off % zbf.blksize)
aend := end + (zbf.blksize - (end % zbf.blksize))
......@@ -489,34 +505,89 @@ func (bfdata *BigFileData) Read(_ nodefs.File, dest []byte, off int64, fctx *fus
blkoff := blkoff
blk := blkoff / zbf.blksize
wg.Go(func() error {
blkdata, err := zbf.LoadBlk(ctx, blk)
return bfdata.readBlk(ctx, blk, dest[blkoff-aoff:]) // XXX dest.size
})
}
err := wg.Wait()
if err != nil {
log.Printf("%s", err) // XXX + /bigfile/XXX: read [a,b): -> ...
return nil, fuse.EIO
}
return fuse.ReadResultData(dest[off-aoff:end - (off-aoff)]), fuse.OK
}
// readBlk serves Read to read 1 ZBlk #blk into destination buffer.
func (bfdata *BigFileData) readBlk(ctx context.Context, blk int64, dest []byte) error {
// check if someone else is already loading this block
bfdata.loadMu.Lock()
loading, already := bfdata.loading[blk]
if !already {
loading = &blkLoadState{
ready: make(chan struct{}),
}
bfdata.loading = loading
}
bfdata.loadMu.Unlock()
// if it is already loading - wait for it
if already {
select {
case <-ctx.Done():
return ctx.Err()
case <-loading.ready:
if loading.err != nil {
copy(dest, loading.blkdata)
}
return loading.err
}
}
// noone was loading - we became reponsible to load this block
blkdata, err := zbf.LoadBlk(ctx, blk)
loading.blkdata = blkdata
loading.err = err
close(loading.ready)
// data loaded with error - cleanup .loading
if loading.err != nil {
bfdata.loadMu.Lock()
delete bfdata.loading[blk]
bfdata.loadMu.Unlock()
return err
}
copy(dest[blkoff - aoff:], blkdata)
// data loaded ok
copy(dest, blkdata)
// store to kernel pagecache whole block that we've just loaded from database.
// This way, even if the user currently requested to read only small portion from it,
// it will prevent next e.g. consecutive user read request to again hit
// the DB, and instead will be served by kernel from its cache.
st := gfsconn.FileNotifyStoreCache(bfdata.Inode(), blkoff, blkdata)
//
// We cannot do this directly from reading goroutine - while reading
// kernel FUSE is holding corresponging page in pagecache locked, and if
// we would try to update that same page in the cache it would result
// in deadlock inside kernel.
//
// .loading cleanup is done once we are finished with putting the data into OS cache.
go func() {
// XXX locking - invalidation must make sure this workers are finished.
st := gfsconn.FileNotifyStoreCache(bfdata.Inode(), blk*blksize, blkdata)
bfdata.loadMu.Lock()
delete bfdata.loading[blk]
bfdata.loadMu.Unlock()
if st != fuse.OK {
return fmt.Errorf("bigfile %s: blk %d: -> pagecache: %s", zbf.POid(), blk, st)
}
return nil
})
}
err := wg.Wait()
if err != nil {
log.Printf("%s", err) // XXX + /bigfile/XXX: read [a,b): -> ...
return nil, fuse.EIO
}
return fuse.ReadResultData(dest[off-aoff:end - (off-aoff)]), fuse.OK
return nil
}
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment