Commit cb14b213 authored by Kirill Smelkov's avatar Kirill Smelkov

wcfs: Add FileSock FUSE utility

FileSock is bidirectional channel associated with opened file.

FileSock provides streaming write/read operations for filesystem server that
are correspondingly matched with read/write operations on filesystem user side.

WCFS will use FileSock to implement exchange over .wcfs/zhead and,
later, head/watch files.

Some preliminary history:

b17aeb8c    X Change FileSock to use xio.Pipe which is io.Pipe + support for IO cancellation
parent 23d8da82
...@@ -24,6 +24,7 @@ import ( ...@@ -24,6 +24,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"math"
"sync/atomic" "sync/atomic"
"syscall" "syscall"
...@@ -32,6 +33,8 @@ import ( ...@@ -32,6 +33,8 @@ import (
"github.com/hanwen/go-fuse/v2/fuse" "github.com/hanwen/go-fuse/v2/fuse"
"github.com/hanwen/go-fuse/v2/fuse/nodefs" "github.com/hanwen/go-fuse/v2/fuse/nodefs"
"github.com/pkg/errors" "github.com/pkg/errors"
"lab.nexedi.com/kirr/go123/xio"
) )
// ---- FUSE ---- // ---- FUSE ----
...@@ -263,6 +266,164 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve ...@@ -263,6 +266,164 @@ func mount(mntpt string, root nodefs.Node, opts *fuse.MountOptions) (*fuse.Serve
} }
// FileSock is bidirectional channel associated with opened file.
//
// FileSock provides streaming write/read operations for filesystem server that
// are correspondingly matched with read/write operations on filesystem user side.
type FileSock struct {
file *skFile // filesock's file peer
rx *xio.PipeReader // socket reads from file here
tx *xio.PipeWriter // socket writes to file here
}
// skFile is File peer of FileSock.
//
// skFile.Read is connected with sk.Write.
// skFile.Write is connected with sk.Read.
//
// skFile is always created with nonseekable & directio flags, to support
// streaming semantics.
type skFile struct {
nodefs.File
rx *xio.PipeReader // file reads from socket here
tx *xio.PipeWriter // file writes to socket here
}
// NewFileSock creates new file socket.
//
// After file socket is created, File return should be given to kernel for the
// socket to be connected to an opened file.
func NewFileSock() *FileSock {
sk := &FileSock{}
f := &skFile{
File: nodefs.NewDefaultFile(),
}
sk.file = f
rx, tx := xio.Pipe()
sk.rx = rx
f .tx = tx
rx, tx = xio.Pipe()
f .rx = rx
sk.tx = tx
return sk
}
// File returns nodefs.File handle that is connected to the socket.
//
// The handle should be given to kernel as result of a file open, for that file
// to be connected to the socket.
func (sk *FileSock) File() nodefs.File {
// nonseekable & directio for opened file to have streaming semantic as
// if it was a socket. FOPEN_STREAM is used so that both read and write
// could be run simultaneously: git.kernel.org/linus/10dce8af3422
return &nodefs.WithFlags{
File: sk.file,
FuseFlags: fuse.FOPEN_STREAM | fuse.FOPEN_NONSEEKABLE | fuse.FOPEN_DIRECT_IO,
}
}
// Write writes data to filesock.
//
// The data will be read by client reading from filesock's file.
// Write semantic is that of xio.Writer.
func (sk *FileSock) Write(ctx context.Context, data []byte) (n int, err error) {
return sk.tx.Write(ctx, data)
}
// Read implements nodefs.File and is paired with filesock.Write().
func (f *skFile) Read(dest []byte, /*ignored*/off int64, fctx *fuse.Context) (fuse.ReadResult, fuse.Status) {
n, err := f.rx.Read(fctx, dest)
if n != 0 {
err = nil
}
if err == io.EOF {
n = 0 // read(2): "zero indicates end of file"
err = nil
}
if err != nil {
return nil, err2LogStatus(err)
}
return fuse.ReadResultData(dest[:n]), fuse.OK
}
// Read reads data from filesock.
//
// The data read will be that the client writes into filesock's file.
// Read semantic is that of xio.Reader.
func (sk *FileSock) Read(ctx context.Context, dest []byte) (n int, err error) {
return sk.rx.Read(ctx, dest)
}
// Write implements nodefs.File and is paired with filesock.Read()
func (f *skFile) Write(data []byte, /*ignored*/off int64, fctx *fuse.Context) (uint32, fuse.Status) {
// cap data to 2GB (not 4GB not to overflow int on 32-bit platforms)
l := len(data)
if l > math.MaxInt32 {
l = math.MaxInt32
data = data[:l]
}
n, err := f.tx.Write(fctx, data)
if n != 0 {
err = nil
}
if err == io.ErrClosedPipe {
err = syscall.ECONNRESET
}
if err != nil {
return 0, err2LogStatus(err)
}
return uint32(n), fuse.OK
}
// CloseRead closes reading side of filesock.
func (sk *FileSock) CloseRead() error {
return sk.rx.Close()
}
// CloseWrite closes writing side of filesock.
func (sk *FileSock) CloseWrite() error {
return sk.tx.Close()
}
// Close closes filesock.
//
// it is semantically equivalent to CloseRead + CloseWrite.
func (sk *FileSock) Close() error {
err := sk.CloseRead()
err2 := sk.CloseWrite()
if err == nil {
err = err2
}
return err
}
// Release implements nodefs.File to handle when last user reference to the file is gone.
//
// Note: it is not Flush, since Fush is called on close(file) and in general
// multiple time (e.g. in case of duplicated file descriptor).
func (f *skFile) Release() {
err := f.rx.Close()
err2 := f.tx.Close()
if err == nil {
err = err2
}
// the kernel ignores any error return from release.
// close on pipes always returns nil and can be called multiple times.
if err != nil {
panic(err)
}
}
// ---- make df happy (else it complains "function not supported") ---- // ---- make df happy (else it complains "function not supported") ----
func (root *Root) StatFs() *fuse.StatfsOut { func (root *Root) StatFs() *fuse.StatfsOut {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment