Commit a54bca4b authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Open source UnionFS, a FUSE union mount filesystem.

parent 4e6239d4
# Use "gomake install" to build and install this package.
include $(GOROOT)/src/Make.inc
TARG=unionfs
GOFILES=main.go
DEPS=../../fuse ../../unionfs
include $(GOROOT)/src/Make.cmd
package main
import (
"flag"
"fmt"
"github.com/hanwen/go-fuse/fuse"
"github.com/hanwen/go-fuse/unionfs"
"os"
)
func main() {
debug := flag.Bool("debug", false, "debug on")
threaded := flag.Bool("threaded", true, "debug on")
delcache_ttl := flag.Float64("deletion_cache_ttl", 5.0, "Deletion cache TTL in seconds.")
branchcache_ttl := flag.Float64("branchcache_ttl", 5.0, "Branch cache TTL in seconds.")
deldirname := flag.String(
"deletion_dirname", "GOUNIONFS_DELETIONS", "Directory name to use for deletions.")
flag.Parse()
if len(flag.Args()) < 2 {
fmt.Println("Usage:\n main MOUNTPOINT BASEDIR")
os.Exit(2)
}
mountpoint := flag.Arg(0)
ufsOptions := unionfs.UnionFsOptions{
DeletionCacheTTLSecs: *delcache_ttl,
BranchCacheTTLSecs: *branchcache_ttl,
DeletionDirName: *deldirname,
}
options := unionfs.AutoUnionFsOptions{
UnionFsOptions: ufsOptions,
}
gofs := unionfs.NewAutoUnionFs(flag.Arg(1), options)
conn := fuse.NewPathFileSystemConnector(gofs)
mountState := fuse.NewMountState(conn)
mountState.Debug = *debug
fmt.Printf("Mounting...\n")
err := mountState.Mount(mountpoint)
if err != nil {
fmt.Printf("MountFuse fail: %v\n", err)
os.Exit(1)
}
fmt.Printf("Mounted!\n")
mountState.Loop(*threaded)
}
include $(GOROOT)/src/Make.inc
TARG=github.com/hanwen/go-fuse/unionfs
GOFILES=unionfs.go \
dircache.go \
timedcache.go \
cachingfs.go \
autounion.go \
DEPS=../fuse
include $(GOROOT)/src/Make.pkg
package unionfs
import (
"fmt"
"github.com/hanwen/go-fuse/fuse"
"log"
"os"
"path/filepath"
"strings"
"syscall"
"sync"
"time"
)
// Creates unions for all files under a given directory,
// walking the tree and looking for directories D which have a
// D/READONLY symlink.
//
// A union for A/B/C will placed under directory A-B-C.
type AutoUnionFs struct {
fuse.DefaultPathFilesystem
lock sync.RWMutex
knownFilesystems map[string]*UnionFs
root string
connector *fuse.PathFileSystemConnector
options *AutoUnionFsOptions
}
type AutoUnionFsOptions struct {
UnionFsOptions
}
func NewAutoUnionFs(directory string, options AutoUnionFsOptions) *AutoUnionFs {
a := new(AutoUnionFs)
a.knownFilesystems = make(map[string]*UnionFs)
a.options = &options
a.root = directory
return a
}
func (me *AutoUnionFs) Mount(connector *fuse.PathFileSystemConnector) fuse.Status {
me.connector = connector
time.AfterFunc(0.1e9, func() { me.updateKnownFses() })
return fuse.OK
}
func (me *AutoUnionFs) addFs(roots []string) {
relative := strings.TrimLeft(strings.Replace(roots[0], me.root, "", -1), "/")
name := strings.Replace(relative, "/", "-", -1)
if name == "config" || name == "status" {
log.Println("Illegal name for overlay", roots)
return
}
me.lock.Lock()
var gofs *UnionFs
if me.knownFilesystems[name] == nil {
log.Println("Adding UnionFs for roots", roots)
gofs = NewUnionfs(roots, me.options.UnionFsOptions)
me.knownFilesystems[name] = gofs
}
me.lock.Unlock()
if gofs != nil {
me.connector.Mount("/"+name, gofs)
}
}
// TODO - should hide these methods.
func (me *AutoUnionFs) VisitDir(path string, f *os.FileInfo) bool {
ro := filepath.Join(path, "READONLY")
fi, err := os.Lstat(ro)
if err == nil && fi.IsSymlink() {
// TODO - should recurse and chain all READONLYs
// together.
me.addFs([]string{path, ro})
}
return true
}
func (me *AutoUnionFs) VisitFile(path string, f *os.FileInfo) {
}
func (me *AutoUnionFs) updateKnownFses() {
log.Println("Looking for new filesystems")
filepath.Walk(me.root, me, nil)
}
func (me *AutoUnionFs) Readlink(path string) (out string, code fuse.Status) {
comps := strings.Split(path, filepath.SeparatorString, -1)
if comps[0] != "config" {
return "", fuse.ENOENT
}
name := comps[1]
me.lock.RLock()
defer me.lock.RUnlock()
fs := me.knownFilesystems[name]
if fs == nil {
return "", fuse.ENOENT
}
return fs.Roots()[0], fuse.OK
}
func (me *AutoUnionFs) GetAttr(path string) (*fuse.Attr, fuse.Status) {
if path == "" || path == "config" || path == "status" {
a := &fuse.Attr{
Mode: fuse.S_IFDIR | 0755,
}
return a, fuse.OK
}
if path == "status/gounionfs_version" {
a := &fuse.Attr{
Mode: fuse.S_IFREG | 0644,
}
return a, fuse.OK
}
comps := strings.Split(path, filepath.SeparatorString, -1)
me.lock.RLock()
defer me.lock.RUnlock()
if len(comps) > 1 && comps[0] == "config" {
fs := me.knownFilesystems[comps[1]]
if fs == nil {
return nil, fuse.ENOENT
}
a := &fuse.Attr{
Mode: syscall.S_IFLNK | 0644,
}
return a, fuse.OK
}
if me.knownFilesystems[path] != nil {
return &fuse.Attr{
Mode: fuse.S_IFDIR | 0755,
},fuse.OK
}
return nil, fuse.ENOENT
}
func (me *AutoUnionFs) StatusDir() (stream chan fuse.DirEntry, status fuse.Status) {
stream = make(chan fuse.DirEntry, 1)
stream <- fuse.DirEntry{
Name: "gounionfs_version",
Mode: fuse.S_IFREG | 0644,
}
close(stream)
return stream, fuse.OK
}
func (me *AutoUnionFs) OpenDir(name string) (stream chan fuse.DirEntry, status fuse.Status) {
switch name {
case "status":
return me.StatusDir()
case "config":
me.updateKnownFses()
case "/":
name = ""
case "":
default:
panic(fmt.Sprintf("Don't know how to list dir %v", name))
}
me.lock.RLock()
defer me.lock.RUnlock()
stream = make(chan fuse.DirEntry, len(me.knownFilesystems)+5)
for k, _ := range me.knownFilesystems {
mode := fuse.S_IFDIR | 0755
if name == "config" {
mode = syscall.S_IFLNK | 0644
}
stream <- fuse.DirEntry{
Name: k,
Mode: uint32(mode),
}
}
if name == "" {
stream <- fuse.DirEntry{
Name: "config",
Mode: uint32(fuse.S_IFDIR | 0755),
}
stream <- fuse.DirEntry{
Name: "status",
Mode: uint32(fuse.S_IFDIR | 0755),
}
}
close(stream)
return stream, status
}
package unionfs
import (
"github.com/hanwen/go-fuse/fuse"
"sync"
)
type attrResponse struct {
attr *fuse.Attr
code fuse.Status
}
type dirResponse struct {
entries []fuse.DirEntry
code fuse.Status
}
type linkResponse struct {
linkContent string
code fuse.Status
}
// Caches readdir and getattr()
type CachingFileSystem struct {
fuse.WrappingPathFilesystem
attributesLock sync.RWMutex
attributes map[string]attrResponse
dirsLock sync.RWMutex
dirs map[string]dirResponse
linksLock sync.RWMutex
links map[string]linkResponse
}
func NewCachingFileSystem(pfs fuse.PathFilesystem) *CachingFileSystem {
c := new(CachingFileSystem)
c.Original = pfs
c.attributes = make(map[string]attrResponse)
c.dirs = make(map[string]dirResponse)
c.links = make(map[string]linkResponse)
return c
}
func (me *CachingFileSystem) GetAttr(name string) (*fuse.Attr, fuse.Status) {
me.attributesLock.RLock()
v, ok := me.attributes[name]
me.attributesLock.RUnlock()
if ok {
return v.attr, v.code
}
var r attrResponse
r.attr, r.code = me.Original.GetAttr(name)
// TODO - could do async.
me.attributesLock.Lock()
me.attributes[name] = r
me.attributesLock.Unlock()
return r.attr, r.code
}
func (me *CachingFileSystem) Readlink(name string) (string, fuse.Status) {
me.linksLock.RLock()
v, ok := me.links[name]
me.linksLock.RUnlock()
if ok {
return v.linkContent, v.code
}
v.linkContent, v.code = me.Original.Readlink(name)
// TODO - could do async.
me.linksLock.Lock()
me.links[name] = v
me.linksLock.Unlock()
return v.linkContent, v.code
}
func (me *CachingFileSystem) OpenDir(name string) (stream chan fuse.DirEntry, status fuse.Status) {
me.dirsLock.RLock()
v, ok := me.dirs[name]
me.dirsLock.RUnlock()
if !ok {
origStream, code := me.Original.OpenDir(name)
if code != fuse.OK {
return nil, code
}
v.code = code
for {
d := <-origStream
if d.Name == "" {
break
}
v.entries = append(v.entries, d)
}
me.dirsLock.Lock()
me.dirs[name] = v
me.dirsLock.Unlock()
}
stream = make(chan fuse.DirEntry)
go func() {
for _, d := range v.entries {
stream <- d
}
stream <- fuse.DirEntry{}
}()
return stream, v.code
}
package unionfs
import (
"os"
"sync"
"log"
"time"
)
/*
On error, returns an empty map, since we have little options
for outputting any other diagnostics.
*/
func newDirnameMap(dir string) map[string]bool {
result := make(map[string]bool)
f, err := os.Open(dir)
if err != nil {
log.Printf("newDirnameMap(): %v %v", dir, err)
return result
}
names, err := f.Readdirnames(-1)
if err != nil {
log.Printf("newDirnameMap(): readdirnames %v %v", dir, err)
return result
}
for _, n := range names {
result[n] = true
}
return result
}
/*
Caches names in a directory for some time.
If called when the cache is expired, the filenames are read afresh in
the background.
*/
type DirCache struct {
dir string
ttlNs int64
// Protects data below.
lock sync.RWMutex
// If nil, you may call refresh() to schedule a new one.
names map[string]bool
updateRunning bool
}
func (me *DirCache) setMap(newMap map[string]bool) {
me.lock.Lock()
defer me.lock.Unlock()
me.names = newMap
me.updateRunning = false
_ = time.AfterFunc(me.ttlNs,
func() {
me.lock.Lock()
me.names = nil
me.lock.Unlock()
})
}
// Try to refresh: if another update is already running, do nothing,
// otherwise, read the directory and set it.
func (me *DirCache) maybeRefresh() {
me.lock.Lock()
defer me.lock.Unlock()
if me.updateRunning {
return
}
me.updateRunning = true
go func() {
me.setMap(newDirnameMap(me.dir))
}()
}
func (me *DirCache) RemoveEntry(name string) {
me.lock.Lock()
defer me.lock.Unlock()
if me.names == nil {
go me.maybeRefresh()
return
}
me.names[name] = false, false
}
func (me *DirCache) AddEntry(name string) {
me.lock.Lock()
defer me.lock.Unlock()
if me.names == nil {
go me.maybeRefresh()
return
}
me.names[name] = true
}
func NewDirCache(dir string, ttlNs int64) *DirCache {
dc := new(DirCache)
dc.dir = dir
dc.ttlNs = ttlNs
return dc
}
func (me *DirCache) HasEntry(name string) (mapPresent bool, found bool) {
me.lock.RLock()
defer me.lock.RUnlock()
if me.names == nil {
go me.maybeRefresh()
return false, false
}
return true, me.names[name]
}
package unionfs
import (
"log"
"sync"
"time"
)
var _ = log.Println
type cacheEntry struct {
data interface{}
// expiryNs is the absolute timestamp of the expiry.
expiryNs int64
}
// TimedIntCache caches the result of fetch() for some time.
//
// Oh, how I wish we had generics.
type TimedCache struct {
fetch func(name string) interface{}
// ttlNs is a duration of the cache.
ttlNs int64
cacheMapMutex sync.RWMutex
cacheMap map[string]*cacheEntry
}
const layerCacheTimeoutNs = 1e9
func NewTimedCache(fetcher func(name string) interface{}, ttlNs int64) *TimedCache {
l := new(TimedCache)
l.ttlNs = ttlNs
l.fetch = fetcher
l.cacheMap = make(map[string]*cacheEntry)
return l
}
func (me *TimedCache) Get(name string) interface{} {
me.cacheMapMutex.RLock()
info, ok := me.cacheMap[name]
me.cacheMapMutex.RUnlock()
now := time.Nanoseconds()
if ok && info.expiryNs > now {
return info.data
}
return me.getDataNoCache(name)
}
func (me *TimedCache) Set(name string, val interface{}) {
me.cacheMapMutex.Lock()
defer me.cacheMapMutex.Unlock()
me.cacheMap[name] = &cacheEntry{
data: val,
expiryNs: time.Nanoseconds() + me.ttlNs,
}
}
func (me *TimedCache) getDataNoCache(name string) interface{} {
data := me.fetch(name)
me.Set(name, data)
return data
}
// Drop all expired entries.
func (me *TimedCache) Purge() {
keys := make([]string, 0, len(me.cacheMap))
now := time.Nanoseconds()
me.cacheMapMutex.Lock()
defer me.cacheMapMutex.Unlock()
for k, v := range me.cacheMap {
if v.expiryNs < now {
keys = append(keys, k)
}
}
for _, k := range keys {
me.cacheMap[k] = nil, false
}
}
package unionfs
import (
"fmt"
"log"
"time"
"testing"
)
var _ = fmt.Print
var _ = log.Print
func TestTimedIntCache(t *testing.T) {
fetchCount := 0
fetch := func(n string) interface{} {
fetchCount++
i := int(n[0])
return &i
}
var ttl int64
ttl = 1e6
cache := NewTimedCache(fetch, ttl)
v := cache.Get("n").(*int)
if *v != int('n') {
t.Error("value mismatch", v)
}
if fetchCount != 1 {
t.Error("fetch count mismatch", fetchCount)
}
// The cache update is async.
time.Sleep(ttl / 10)
w := cache.Get("n")
if v != w {
t.Error("Huh, inconsistent.")
}
if fetchCount > 1 {
t.Error("fetch count fail.", fetchCount)
}
time.Sleep(ttl * 2)
cache.Purge()
w = cache.Get("n")
if fetchCount == 1 {
t.Error("did not fetch again. Purge unsuccessful?")
}
}
This diff is collapsed.
package unionfs
import (
"os"
"github.com/hanwen/go-fuse/fuse"
"fmt"
"log"
"testing"
)
var _ = fmt.Print
var _ = log.Print
var CheckSuccess = fuse.CheckSuccess
func TestFilePathHash(t *testing.T) {
// Simple test coverage.
t.Log(filePathHash("xyz/abc"))
}
var testOpts = UnionFsOptions{
DeletionCacheTTLSecs: 0.01,
DeletionDirName: "DELETIONS",
BranchCacheTTLSecs: 0.01,
}
func setup(t *testing.T) (workdir string, state *fuse.MountState) {
wd := fuse.MakeTempDir()
err := os.Mkdir(wd+"/mount", 0700)
fuse.CheckSuccess(err)
err = os.Mkdir(wd+"/rw", 0700)
fuse.CheckSuccess(err)
os.Mkdir(wd+"/ro", 0700)
fuse.CheckSuccess(err)
var roots []string
roots = append(roots, wd+"/rw")
roots = append(roots, wd+"/ro")
ufs := NewUnionfs(roots, testOpts)
connector := fuse.NewPathFileSystemConnector(ufs)
state = fuse.NewMountState(connector)
state.Mount(wd + "/mount")
go state.Loop(false)
return wd, state
}
func writeToFile(path string, contents string, create bool) {
var flags int = os.O_WRONLY
if create {
flags |= os.O_CREATE
}
f, err := os.OpenFile(path, flags, 0644)
fuse.CheckSuccess(err)
_, err = f.Write([]byte(contents))
fuse.CheckSuccess(err)
err = f.Close()
fuse.CheckSuccess(err)
}
func readFromFile(path string) string {
f, err := os.Open(path)
fuse.CheckSuccess(err)
fi, err := os.Stat(path)
content := make([]byte, fi.Size)
n, err := f.Read(content)
fuse.CheckSuccess(err)
if n < int(fi.Size) {
panic("short read.")
}
err = f.Close()
fuse.CheckSuccess(err)
return string(content)
}
func dirNames(path string) map[string]bool {
f, err := os.Open(path)
fuse.CheckSuccess(err)
result := make(map[string]bool)
names, err := f.Readdirnames(-1)
fuse.CheckSuccess(err)
err = f.Close()
CheckSuccess(err)
for _, nm := range names {
result[nm] = true
}
return result
}
func checkMapEq(t *testing.T, m1, m2 map[string]bool) {
if !mapEq(m1, m2) {
msg := fmt.Sprintf("mismatch: got %v != expect %v", m1, m2)
log.Print(msg)
t.Error(msg)
}
}
func mapEq(m1, m2 map[string]bool) bool {
if len(m1) != len(m2) {
return false
}
for k, v := range m1 {
ok, val := m2[k]
if !ok || val != v {
return false
}
}
return true
}
func fileExists(path string) bool {
f, err := os.Lstat(path)
return err == nil && f != nil
}
func remove(path string) {
err := os.Remove(path)
fuse.CheckSuccess(err)
}
func TestSymlink(t *testing.T) {
wd, state := setup(t)
defer state.Unmount()
err := os.Symlink("/foobar", wd+"/mount/link")
CheckSuccess(err)
val, err := os.Readlink(wd + "/mount/link")
CheckSuccess(err)
if val != "/foobar" {
t.Errorf("symlink mismatch: %v", val)
}
}
func TestBasic(t *testing.T) {
wd, state := setup(t)
defer state.Unmount()
writeToFile(wd+"/rw/rw", "a", true)
writeToFile(wd+"/ro/ro1", "a", true)
writeToFile(wd+"/ro/ro2", "b", true)
names := dirNames(wd + "/mount")
expected := map[string]bool{
"rw": true, "ro1": true, "ro2": true,
}
checkMapEq(t, names, expected)
writeToFile(wd+"/mount/new", "new contents", true)
if !fileExists(wd + "/rw/new") {
t.Errorf("missing file in rw layer", names)
}
if readFromFile(wd+"/mount/new") != "new contents" {
t.Errorf("read mismatch.")
}
writeToFile(wd+"/mount/ro1", "promote me", false)
if !fileExists(wd + "/rw/ro1") {
t.Errorf("missing file in rw layer", names)
}
remove(wd + "/mount/new")
names = dirNames(wd + "/mount")
checkMapEq(t, names, map[string]bool{
"rw": true, "ro1": true, "ro2": true,
})
names = dirNames(wd + "/rw")
checkMapEq(t, names, map[string]bool{
testOpts.DeletionDirName: true,
"rw": true, "ro1": true,
})
names = dirNames(wd + "/rw/" + testOpts.DeletionDirName)
if len(names) != 0 {
t.Errorf("Expected 0 entry in %v", names)
}
remove(wd + "/mount/ro1")
names = dirNames(wd + "/mount")
checkMapEq(t, names, map[string]bool{
"rw": true, "ro2": true,
})
names = dirNames(wd + "/rw")
checkMapEq(t, names, map[string]bool{
"rw": true, testOpts.DeletionDirName: true,
})
names = dirNames(wd + "/rw/" + testOpts.DeletionDirName)
if len(names) != 1 {
t.Errorf("Expected 1 entry in %v", names)
}
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment