Commit 46e6f6a0 authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent 7f4eb022
...@@ -22,13 +22,74 @@ package main ...@@ -22,13 +22,74 @@ package main
import ( import (
"context" "context"
"fmt"
"syscall"
log "github.com/golang/glog"
"github.com/hanwen/go-fuse/fuse" "github.com/hanwen/go-fuse/fuse"
"github.com/hanwen/go-fuse/fuse/nodefs" "github.com/hanwen/go-fuse/fuse/nodefs"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
) )
// // errors as fuse.Status (not already exposed by fuse iteself)
// const (
// fEEXIST = fuse.Status(syscall.EEXIST)
// )
// eInvalError is the error wrapper signifying that underlying error is about "invalid argument".
type eInvalError struct {
err error
}
func (e *eInvalError) Error() string {
return "invalid argument: " + e.err.Error()
}
// don't propagate eInvalError.Cause -> e.err
func eINVAL(err error) *eInvalError {
return &eInvalError{err}
}
func eINVALf(format string, argv ...interface{}) *eInvalError {
return eINVAL(fmt.Errorf(format, argv...))
}
// err2LogStatus converts an error into FUSE status code and logs it appropriately.
//
// the error is logged because otherwise, if e.g. returning EINVAL or EIO
// codes, there is no more detail except the error code itself.
func err2LogStatus(err error) fuse.Status {
// no error
if err == nil {
return fuse.OK
}
// direct usage of error code - don't log
ecode, iscode := err.(syscall.Errno)
if iscode {
return fuse.Status(ecode)
}
// otherwise log as warnings EINVAL and as errors everything else
var st fuse.Status
switch errors.Cause(err).(type) {
case *eInvalError:
st = fuse.EINVAL
log.Warning(err)
default:
st = fuse.EIO
log.Error(err)
}
return st
}
// asctx represents fuse context as context.Context ready for interrupt handling. // asctx represents fuse context as context.Context ready for interrupt handling.
// //
// XXX temp. only after proper interrupt handling is not yet merged into go-fuse. // XXX temp. only after proper interrupt handling is not yet merged into go-fuse.
......
...@@ -257,6 +257,7 @@ import ( ...@@ -257,6 +257,7 @@ import (
"flag" "flag"
stdlog "log" stdlog "log"
"os" "os"
"strings"
"sync" "sync"
"syscall" "syscall"
...@@ -264,6 +265,7 @@ import ( ...@@ -264,6 +265,7 @@ import (
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"lab.nexedi.com/kirr/go123/xcontext" "lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/go123/xerr"
"lab.nexedi.com/kirr/neo/go/transaction" "lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb" "lab.nexedi.com/kirr/neo/go/zodb"
...@@ -288,12 +290,16 @@ type BigFileRoot struct { ...@@ -288,12 +290,16 @@ type BigFileRoot struct {
// /bigfile/<bigfileX>/ - served by BigFileDir. // /bigfile/<bigfileX>/ - served by BigFileDir.
type BigFileDir struct { type BigFileDir struct {
nodefs.Node nodefs.Node
oid zodb.Oid // oid of ZBigFile
// ZODB DB handle for this bigfile.
// keeps cache of connections for both head/ and @<rev>/ accesses.
zdb *zodb.DB zdb *zodb.DB
// head/ is implicitly linked to by fs // head/ is implicitly linked to by fs
// {} rev -> @<rev>/ bigfile snapshot // {} rev -> @<rev>/ bigfile snapshot
revMu sync.Mutex mu sync.Mutex
revTab map[zodb.Tid]*BigFileRev revTab map[zodb.Tid]*BigFileRev
} }
...@@ -350,30 +356,7 @@ type blkLoadState struct { ...@@ -350,30 +356,7 @@ type blkLoadState struct {
// ---------------------------------------- // ----------------------------------------
/*
// /bigfile -> Mkdir receives client request to create /bigfile/<bigfileX>.
//
// It creates <bigfileX>/head/* along the way.
func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (_ *nodefs.Inode, status fuse.Status) {
oid, err := zodb.ParseOid(name)
if err != nil {
log.Warningf("/bigfile: mkdir %q: not-oid", name)
return nil, fuse.EINVAL
}
// XXX ok to ignore mode?
// check to see if dir(oid) is already there
bfroot.mu.Lock()
_, already := bfroot.tab[oid]
bfroot.mu.Unlock()
if already {
return nil, fuse.Status(syscall.EEXIST)
}
// not there - without bfroot lock proceed to load corresponding objects from ZODB:
// create new read-only transaction for this bigfile // create new read-only transaction for this bigfile
txn, txnCtx := transaction.New(context.Background()) txn, txnCtx := transaction.New(context.Background())
defer func() { defer func() {
...@@ -432,6 +415,48 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) ( ...@@ -432,6 +415,48 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (
log.Errorf("/bigfile: mkdir %q: %s", name, err) log.Errorf("/bigfile: mkdir %q: %s", name, err)
return nil, fuse.EIO return nil, fuse.EIO
} }
*/
// /bigfile -> Mkdir receives client request to create /bigfile/<bigfileX>.
//
// It creates <bigfileX>/head/* along the way.
func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (_ *nodefs.Inode, status fuse.Status) {
// XXX ok to ignore mode?
inode, err := bfroot.mkdir(name, fctx)
return inode, err2LogStatus(err)
}
func (bfroot *BigFileRoot) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err error) {
defer xerr.Contextf(&err, "/bigfile: mkdir %q", name)
oid, err := zodb.ParseOid(name)
if err != nil {
return nil, eINVALf("not oid")
}
// check to see if dir(oid) is already there
bfroot.mu.Lock()
_, already := bfroot.tab[oid]
bfroot.mu.Unlock()
if already {
return nil, syscall.EEXIST
}
// not there - without bfroot lock proceed to load corresponding objects from ZODB
zdb := zodb.NewDB(bfroot.zstor)
bf, err := bigopen(asctx(fctx), zdb, oid, &zodb.ConnOptions{})
if err != nil {
return nil, err
}
defer func() {
if err != nil {
bf.Close()
}
}()
// relock bfroot and either mkdir or EEXIST if the directory was maybe // relock bfroot and either mkdir or EEXIST if the directory was maybe
// simultanously created while we were not holding bfroot.mu // simultanously created while we were not holding bfroot.mu
...@@ -439,11 +464,12 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) ( ...@@ -439,11 +464,12 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (
_, already = bfroot.tab[oid] _, already = bfroot.tab[oid]
if already { if already {
bfroot.mu.Unlock() bfroot.mu.Unlock()
return nil, fuse.Status(syscall.EEXIST) return nil, syscall.EEXIST
} }
bfdir := &BigFileDir{ bfdir := &BigFileDir{
Node: nodefs.NewDefaultNode(), Node: nodefs.NewDefaultNode(),
oid: oid,
zdb: zdb, zdb: zdb,
} }
...@@ -451,13 +477,6 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) ( ...@@ -451,13 +477,6 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (
Node: nodefs.NewDefaultNode(), Node: nodefs.NewDefaultNode(),
} }
bf := &BigFile{
txnCtx: txnCtx,
zconn: zconn,
zbf: zbf,
zbfSize: zbfSize,
}
bfdata := &BigFileData{ bfdata := &BigFileData{
Node: nodefs.NewDefaultNode(), Node: nodefs.NewDefaultNode(),
bigfile: bf, bigfile: bf,
...@@ -474,16 +493,22 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) ( ...@@ -474,16 +493,22 @@ func (bfroot *BigFileRoot) Mkdir(name string, mode uint32, fctx *fuse.Context) (
mkfile(bfhead, "at", NewSmallFile(bf.readAt)) // TODO mtime(at) = tidtime(at) mkfile(bfhead, "at", NewSmallFile(bf.readAt)) // TODO mtime(at) = tidtime(at)
// XXX mkfile(bh, "invalidations", bh.inv) // XXX mkfile(bh, "invalidations", bh.inv)
return bfdir.Inode(), fuse.OK return bfdir.Inode(), nil
} }
// XXX do we need to support rmdir? (probably no) // XXX do we need to support rmdir? (probably no)
/* // /bigfile/<bigfileX> -> Mkdir receives client request to create @<tid>/.
// /bigfile/<bigfileX> -> Mkdir receives client request to create @<tid>.
func (bfdir *BigFileDir) Mkdir(name string, mode uint32, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) { func (bfdir *BigFileDir) Mkdir(name string, mode uint32, fctx *fuse.Context) (*nodefs.Inode, fuse.Status) {
// XXX ok to ignore mode?
inode, err := bfdir.mkdir(name, fctx)
return inode, err2LogStatus(err)
}
func (bfdir *BigFileDir) mkdir(name string, fctx *fuse.Context) (_ *nodefs.Inode, err error) {
defer xerr.Contextf(&err, "/bigfile/%s: mkdir %q", bfdir.oid, name)
var tid zodb.Tid var tid zodb.Tid
var err error
ok := false ok := false
if strings.HasPrefix(name, "@") { if strings.HasPrefix(name, "@") {
...@@ -491,13 +516,9 @@ func (bfdir *BigFileDir) Mkdir(name string, mode uint32, fctx *fuse.Context) (*n ...@@ -491,13 +516,9 @@ func (bfdir *BigFileDir) Mkdir(name string, mode uint32, fctx *fuse.Context) (*n
ok = (err == nil) ok = (err == nil)
} }
if !ok { if !ok {
log.Warning("/bigfile/XXX: mkdir %q: not-@tid", name) return nil, eINVALf("not @tid")
return nil, fuse.EINVAL
} }
// XXX ok to ignore mode?
// XXX vvv dups BigFileRoot.Mkdir ?
// check to see if dir(tid) is already // check to see if dir(tid) is already
bfdir.mu.Lock() bfdir.mu.Lock()
...@@ -505,47 +526,67 @@ func (bfdir *BigFileDir) Mkdir(name string, mode uint32, fctx *fuse.Context) (*n ...@@ -505,47 +526,67 @@ func (bfdir *BigFileDir) Mkdir(name string, mode uint32, fctx *fuse.Context) (*n
bfdir.mu.Unlock() bfdir.mu.Unlock()
if already { if already {
return nil, fuse.Status(syscall.EEXIST) return nil, syscall.EEXIST
} }
// not there - without bfdir lock proceed to create @tid historical connection // not there - without bfdir lock proceed to create @tid historical connection
// create new read-only transaction for this bigfile conn bf, err := bigopen(asctx(fctx), bfdir.zdb, bfdir.oid, &zodb.ConnOptions{
txn, txnCtx := transaction.New(context.Background()) At: tid,
defer func() { })
if status != fuse.OK { if err != nil {
txn.Abort() return nil, err
} }
defer func() {
if err != nil {
bf.Close()
} }
}()
// create new DB/Connection for this @tid // relock bfdir and either mkdir or EEXIST if the directory was maybe
// simultanously created while we were not holding bfroot.mu
bfdir.mu.Lock()
_, already = bfdir.revTab[tid]
if already {
bfdir.mu.Unlock()
return nil, syscall.EEXIST
}
// XXX better ctx = transaction.PutIntoContext(ctx, txn) bfrev := &BigFileRev{
ctx, cancel := xcontext.Merge(asctx(fctx), txnCtx) Node: nodefs.NewDefaultNode(),
defer cancel() }
// XXX ok to reuse bfdir.zdb? (or better keep that only for head?) revdata := &BigFileData{
zconn, err := bfdir.zdb.Open(ctx, &zodb.ConnOptions{ Node: nodefs.NewDefaultNode(),
At: tid, bigfile: bf,
}) loading: make(map[int64]*blkLoadState),
}
... bfdir.revTab[tid] = bfrev
bfdir.mu.Unlock()
// mkdir takes filesystem treeLock - do it outside bfroot.mu
mkdir(bfdir, name, bfrev)
mkfile(bfrev, "data", revdata) mkfile(bfrev, "data", revdata)
return bfrev.Inode(), fuse.OK return bfrev.Inode(), nil
} }
// XXX -> zopen ? // bigopen opens BigFile corresponding to oid and zopt.
func openBigFile(zopt *zodb.ConnOptions) (, err error) { //
// XXX errctx // A new read-only transaction is opened.
defer xerr.Contextf(&err, "XXX") // A new ZODB connection is opened via zdb.
// A ZBigFile corresponding to oid is activated and statted.
//
// The whole result is returned as BigFile.
func bigopen(ctx context.Context, zdb *zodb.DB, oid zodb.Oid, zopt *zodb.ConnOptions) (_ *BigFile, err error) {
defer xerr.Contextf(&err, "bigopen %s %s", oid, zopt)
// create new read-only transaction for this bigfile // create new read-only transaction for this bigfile
txn, txnCtx := transaction.New(context.Background()) txn, txnCtx := transaction.New(context.Background())
defer func() { defer func() {
if status != fuse.OK { if err != nil {
txn.Abort() txn.Abort()
} }
}() }()
...@@ -554,18 +595,65 @@ func openBigFile(zopt *zodb.ConnOptions) (, err error) { ...@@ -554,18 +595,65 @@ func openBigFile(zopt *zodb.ConnOptions) (, err error) {
// create new DB/Connection for this bigfile open // create new DB/Connection for this bigfile open
// XXX better ctx = transaction.PutIntoContext(ctx, txn) // XXX better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(asctx(fctx), txnCtx) ctx, cancel := xcontext.Merge(ctx, txnCtx)
defer cancel() defer cancel()
zconn, err := zdb.Open(ctx, zope) zconn, err := zdb.Open(ctx, zopt)
if err != nil { if err != nil {
return err return nil, err
//log.Errorf("/bigfile: mkdir %q: %s", name, err) }
//return nil, fuse.EIO
xzbf, err := zconn.Get(ctx, oid)
if err != nil {
switch errors.Cause(err).(type) {
case *zodb.NoObjectError:
return nil, eINVAL(err)
case *zodb.NoDataError:
return nil, eINVAL(err) // XXX what to do if it was existing and got deleted?
default:
return nil, err
}
}
zbf, ok := xzbf.(*ZBigFile)
if !ok {
return nil, eINVALf("%s is not a ZBigFile", typeOf(xzbf))
} }
// activate ZBigFile and keep it this way
err = zbf.PActivate(ctx)
if err != nil {
return nil, err
}
defer func() {
if err != nil {
zbf.PDeactivate()
}
}()
zbfSize, err := zbf.Size(ctx)
if err != nil {
return nil, err
}
return &BigFile{
txnCtx: txnCtx,
zconn: zconn,
zbf: zbf,
zbfSize: zbfSize,
}, nil
} }
*/
// Close release all resources of BigFile.
func (bf *BigFile) Close() error {
bf.zbf.PDeactivate()
bf.zbf = nil
transaction.Current(bf.txnCtx).Abort()
bf.zconn = nil
return nil
}
// /bigfile/<bigfileX>/head/data -> Getattr serves stat. // /bigfile/<bigfileX>/head/data -> Getattr serves stat.
func (bfdata *BigFileData) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status { func (bfdata *BigFileData) GetAttr(out *fuse.Attr, _ nodefs.File, fctx *fuse.Context) fuse.Status {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment