Commit 9a66005f authored by Kirill Smelkov's avatar Kirill Smelkov

.

parent c71be2d4
......@@ -23,6 +23,7 @@ package main
import (
"context"
"fmt"
"sync/atomic"
"syscall"
log "github.com/golang/glog"
......@@ -31,6 +32,9 @@ import (
"github.com/hanwen/go-fuse/fuse/nodefs"
"github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xcontext"
"lab.nexedi.com/kirr/neo/go/transaction"
"lab.nexedi.com/kirr/neo/go/zodb"
)
......@@ -163,6 +167,7 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve
return fssrv, fsconn, nil
}
// ---- ZODB ---
// typeOf returns ZODB type of an object.
//
......@@ -171,3 +176,140 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve
func typeOf(obj interface{}) string {
return zodb.ClassOf(obj)
}
// ZConn is zodb.Connection + associated read-only transaction under which
// objects of the connection are accessed.
type ZConn struct {
*zodb.Connection
// read-only transaction under which we access zodb.Connection data.
txnCtx context.Context // XXX -> better directly store txn
// for historic @<rev> acess the connection can be shared between several bigfiles.
// since we want to free such connections when no longer needed we
// return zodb.Connection back to zodb.DB when refcnt drops to 0.
refcnt int32
}
// zopen opens new connection to ZODB database + associated read-only transaction.
func zopen(ctx context.Context, zdb *zodb.DB, zopt *zodb.ConnOptions) (_ *ZConn, err error) {
// create new read-only transaction
txn, txnCtx := transaction.New(context.Background())
defer func() {
if err != nil {
txn.Abort()
}
}()
// XXX better ctx = transaction.PutIntoContext(ctx, txn)
ctx, cancel := xcontext.Merge(ctx, txnCtx)
defer cancel()
zconn, err := zdb.Open(ctx, zopt)
if err != nil {
return nil, err
}
return &ZConn{
Connection: zconn,
txnCtx: txnCtx,
refcnt: 1,
}, nil
}
// Release decrements reference count and releases connection back to zodb.DB
// if it is no longer used.
func (zc *ZConn) Release() {
rc := atomic.AddInt32(&zc.refcnt, -1)
if rc < 0 {
panic("ZConn.Release: refcnt < 0")
}
if rc > 0 {
return
}
txn := transaction.Current(zc.txnCtx)
txn.Abort()
zc.Connection = nil
zc.txnCtx = nil
}
// Incref increments connection's reference counter by 1.
func (zc *ZConn) Incref() {
rc := atomic.AddInt32(&zc.refcnt, +1)
if rc <= 1 {
panic("Zconn.Incref: refcnt was < 1")
}
}
// zopenAt opens historic connection @<rev>.
//
// if the connection for this @<rev> was already opened - it is shared.
func (r *Root) zopenAt(ctx context.Context, rev zodb.Tid) (_ *ZConn, err error) {
// check if zconn for @<rev> is already there
r.zrevMu.Lock()
zconn := r.zrevTab[rev]
if zconn != nil {
if atomic.LoadInt32(&zconn.refcnt) > 0 {
zconn.Incref()
} else {
zconn = nil // in-progress destruction
}
}
r.zrevMu.Unlock()
if zconn != nil {
return zconn, nil
}
// not there - without zrevMu lock proceed to open it
zconn, err = zopen(ctx, r.zdb, &zodb.ConnOptions{At: rev})
if err != nil {
return nil, err
}
// relock zrevTab and either register zconn, or returun another zconn2,
// that might have been opened while we were not holding zrevMu.
r.zrevMu.Lock()
defer r.zrevMu.Unlock()
zconn2 := r.zrevTab[rev]
if zconn2 != nil {
if atomic.LoadInt32(&zconn2.refcnt) > 0 {
zconn.Release()
zconn2.Incref()
return zconn2, nil
}
// else it was another in-progress destruction
}
r.zrevTab[rev] = zconn
// schedule del zrevTab[rev] on zconn destroy
txn := transaction.Current(zconn.txnCtx)
txn.RegisterSync(&zrevTabUnregister{r, zconn})
return zconn, nil
}
// zrevTabUnregister unregisters zconn from root.zrevTab on zconn's transaction abort.
type zrevTabUnregister struct {
root *Root
zconn *ZConn
}
func (u *zrevTabUnregister) BeforeCompletion(txn transaction.Transaction) {}
func (u *zrevTabUnregister) AfterCompletion(txn transaction.Transaction) {
rev := u.zconn.At()
u.root.zrevMu.Lock()
defer u.root.zrevMu.Unlock()
// delete only if zconn is still registered - as another zconn2 might have
// been already registered instead while zconn was in zrevTab with refcnt=0.
if u.root.zrevTab[rev] == u.zconn {
delete(u.root.zrevTab, rev)
}
}
......@@ -25,6 +25,10 @@ digraph {
wcfsInvProcess -> δFtail;
wcfsInvProcess -> fuseRetrieveCache;
ZODB_go_inv -> fs1_go_inv;
ZODB_go_inv -> zeo_go_inv;
ZODB_go_inv -> neo_go_inv;
wcfsRead -> blktabGet;
wcfsRead -> δFtail;
wcfsRead -> mappingRegister;
......@@ -79,6 +83,9 @@ digraph {
headInv [label="#blk ← head/inv."]
ZODB_go_inv [label="ZODB/go\ninvalidations"]
fs1_go_inv [label="fs1/go\ninvalidations"]
zeo_go_inv [label="zeo/go\ninvalidations"]
neo_go_inv [label="neo/go\ninvalidations"]
// Btree_read [label="BTree read", style=filled fillcolor=lightyellow]
// ZBlk_read [label="ZBigFile / ZBlk* read", style=filled fillcolor=lightyellow]
// ZODB_read [label="ZODB deserialize object", style=filled fillcolor=lightyellow]
......
This diff is collapsed.
This diff is collapsed.
......@@ -201,7 +201,7 @@ def test_bigfile_empty():
# sync wcfs to ZODB
wc._sync()
# we wrote "hello world" after 10th block, but size is always mutiple of blksize.
# we wrote "hello world" after hole'th block, but size is always mutiple of blksize.
fsize = (hole + 1)*blksize
st = os.stat(fpath + "/head/data")
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment