Commit 4d044d88 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Make fuse requests an explicit private type, fuseRequest.

parent dcc775c6
......@@ -10,6 +10,7 @@ import (
"strings"
"sync"
"syscall"
"time"
)
// TODO make generic option setting.
......@@ -25,6 +26,25 @@ type Empty interface{}
////////////////////////////////////////////////////////////////
// State related to this mount point.
type fuseRequest struct {
inputBuf []byte
// These split up inputBuf.
inHeader InHeader
arg *bytes.Buffer
// Data for the output.
data interface{}
status Status
flatData []byte
// The stuff we send back to the kernel.
serialized [][]byte
// Start timestamp for timing info.
startNs int64
}
// TODO - should gather stats and expose those for performance tuning.
type MountState struct {
// We should store the RawFuseFile/Dirs on the Go side,
......@@ -46,7 +66,7 @@ type MountState struct {
// I/O with kernel and daemon.
mountFile *os.File
errorChannel chan os.Error
outputChannel chan [][]byte
outputChannel chan *fuseRequest
// Run each operation in its own Go-routine.
threaded bool
......@@ -127,7 +147,7 @@ func (me *MountState) Mount(mountPoint string) os.Error {
func (me *MountState) Loop(threaded bool) {
me.threaded = threaded
if me.threaded {
me.outputChannel = make(chan [][]byte, 100)
me.outputChannel = make(chan *fuseRequest, 100)
me.errorChannel = make(chan os.Error, 100)
go me.asyncWriterThread()
go me.DefaultErrorHandler()
......@@ -168,15 +188,15 @@ func (me *MountState) Error(err os.Error) {
}
}
func (me *MountState) Write(packet [][]byte) {
if packet == nil {
func (me *MountState) Write(req *fuseRequest) {
if req.serialized == nil {
return
}
if me.threaded {
me.outputChannel <- packet
me.outputChannel <- req
} else {
me.syncWrite(packet)
me.syncWrite(req)
}
}
......@@ -216,30 +236,44 @@ func (me *MountState) asyncWriterThread() {
}
}
func (me *MountState) syncWrite(packet [][]byte) {
_, err := Writev(me.mountFile.Fd(), packet)
func (me *MountState) syncWrite(req *fuseRequest) {
_, err := Writev(me.mountFile.Fd(), req.serialized)
if err != nil {
me.Error(os.NewError(fmt.Sprintf("writer: Writev %v failed, err: %v", packet, err)))
}
for _, v := range packet {
me.buffers.FreeBuffer(v)
me.Error(os.NewError(fmt.Sprintf("writer: Writev %v failed, err: %v", req.serialized, err)))
}
}
me.discardFuseRequest(req)
}
////////////////////////////////////////////////////////////////
// Logic for the control loop.
func (me *MountState) newFuseRequest() (*fuseRequest, os.Error) {
req := new(fuseRequest)
req.status = OK
req.startNs = time.Nanoseconds()
req.inputBuf = me.buffers.AllocBuffer(bufSize)
n, err := me.mountFile.Read(req.inputBuf)
req.inputBuf = req.inputBuf[0:n]
return req, err
}
func (me *MountState) discardFuseRequest(req *fuseRequest) {
me.buffers.FreeBuffer(req.inputBuf)
me.buffers.FreeBuffer(req.flatData)
}
func (me *MountState) loop() {
// See fuse_kern_chan_receive()
for {
buf := me.buffers.AllocBuffer(bufSize)
n, err := me.mountFile.Read(buf)
req, err := me.newFuseRequest()
if err != nil {
errNo := OsErrorToFuseError(err)
// Retry.
if errNo == syscall.ENOENT {
me.discardFuseRequest(req)
continue
}
......@@ -259,47 +293,44 @@ func (me *MountState) loop() {
}
if me.threaded {
go me.handle(buf[0:n])
go me.handle(req)
} else {
me.handle(buf[0:n])
me.handle(req)
}
}
me.mountFile.Close()
}
func (me *MountState) handle(in_data []byte) {
r := bytes.NewBuffer(in_data)
header := new(InHeader)
err := binary.Read(r, binary.LittleEndian, header)
func (me *MountState) handle(req *fuseRequest) {
req.arg = bytes.NewBuffer(req.inputBuf)
err := binary.Read(req.arg, binary.LittleEndian, &req.inHeader)
if err == os.EOF {
err = os.NewError(fmt.Sprintf("MountPoint, handle: can't read a header, in_data: %v", in_data))
err = os.NewError(fmt.Sprintf("MountPoint, handle: can't read a header, in_data: %v", req.inputBuf))
}
if err != nil {
me.Error(err)
return
}
me.Write(dispatch(me, header, r))
me.buffers.FreeBuffer(in_data)
me.dispatch(req)
me.Write(req)
}
func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]byte) {
func (me *MountState) dispatch(req *fuseRequest) {
// TODO - would be nice to remove this logging from the critical path.
state.operationCounts.Add(operationName(h.Opcode), 1)
h := &req.inHeader
me.operationCounts.Add(operationName(h.Opcode), 1)
input := newInput(h.Opcode)
if input != nil && !parseLittleEndian(arg, input) {
return serialize(h, EIO, nil, nil, false)
if input != nil && !parseLittleEndian(req.arg, input) {
req.status = EIO
serialize(req, me.Debug)
return
}
var out Empty
var status Status
var flatData []byte
out = nil
status = OK
fs := state.fileSystem
var status Status = OK
fs := me.fileSystem
filename := ""
// Perhaps a map is faster?
......@@ -307,10 +338,10 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
h.Opcode == FUSE_LOOKUP || h.Opcode == FUSE_MKDIR ||
h.Opcode == FUSE_MKNOD || h.Opcode == FUSE_CREATE ||
h.Opcode == FUSE_LINK {
filename = strings.TrimRight(string(arg.Bytes()), "\x00")
filename = strings.TrimRight(string(req.arg.Bytes()), "\x00")
}
if state.Debug {
if me.Debug {
nm := ""
if filename != "" {
nm = "n: '" + filename + "'"
......@@ -321,7 +352,7 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
// Follow ordering of fuse_lowlevel.h.
switch h.Opcode {
case FUSE_INIT:
out, status = initFuse(state, h, input.(*InitIn))
out, status = initFuse(me, h, input.(*InitIn))
case FUSE_DESTROY:
fs.Destroy(h, input.(*InitIn))
case FUSE_LOOKUP:
......@@ -331,12 +362,13 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
// If we try to write OK, nil, we will get
// error: writer: Writev [[16 0 0 0 0 0 0 0 17 0 0 0 0 0 0 0]]
// failed, err: writev: no such file or directory
return nil
me.discardFuseRequest(req)
return
case FUSE_GETATTR:
// TODO - if input.Fh is set, do file.GetAttr
out, status = fs.GetAttr(h, input.(*GetAttrIn))
case FUSE_SETATTR:
out, status = doSetattr(state, h, input.(*SetAttrIn))
out, status = doSetattr(me, h, input.(*SetAttrIn))
case FUSE_READLINK:
out, status = fs.Readlink(h)
case FUSE_MKNOD:
......@@ -348,14 +380,14 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
case FUSE_RMDIR:
status = fs.Rmdir(h, filename)
case FUSE_SYMLINK:
filenames := strings.Split(string(arg.Bytes()), "\x00", 3)
filenames := strings.Split(string(req.arg.Bytes()), "\x00", 3)
if len(filenames) >= 2 {
out, status = fs.Symlink(h, filenames[1], filenames[0])
} else {
status = EIO
}
case FUSE_RENAME:
filenames := strings.Split(string(arg.Bytes()), "\x00", 3)
filenames := strings.Split(string(req.arg.Bytes()), "\x00", 3)
if len(filenames) >= 2 {
status = fs.Rename(h, input.(*RenameIn), filenames[0], filenames[1])
} else {
......@@ -364,26 +396,26 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
case FUSE_LINK:
out, status = fs.Link(h, input.(*LinkIn), filename)
case FUSE_OPEN:
out, status = doOpen(state, h, input.(*OpenIn))
out, status = doOpen(me, h, input.(*OpenIn))
case FUSE_READ:
flatData, status = doRead(state, h, input.(*ReadIn), state.buffers)
req.flatData, status = doRead(me, h, input.(*ReadIn), me.buffers)
case FUSE_WRITE:
out, status = doWrite(state, h, input.(*WriteIn), arg.Bytes())
out, status = doWrite(me, h, input.(*WriteIn), req.arg.Bytes())
case FUSE_FLUSH:
out, status = doFlush(state, h, input.(*FlushIn))
out, status = doFlush(me, h, input.(*FlushIn))
case FUSE_RELEASE:
out, status = doRelease(state, h, input.(*ReleaseIn))
out, status = doRelease(me, h, input.(*ReleaseIn))
case FUSE_FSYNC:
status = doFsync(state, h, input.(*FsyncIn))
status = doFsync(me, h, input.(*FsyncIn))
case FUSE_OPENDIR:
out, status = doOpenDir(state, h, input.(*OpenIn))
out, status = doOpenDir(me, h, input.(*OpenIn))
case FUSE_READDIR:
out, status = doReadDir(state, h, input.(*ReadIn))
out, status = doReadDir(me, h, input.(*ReadIn))
case FUSE_RELEASEDIR:
out, status = doReleaseDir(state, h, input.(*ReleaseIn))
out, status = doReleaseDir(me, h, input.(*ReleaseIn))
case FUSE_FSYNCDIR:
// todo- check input type.
status = doFsyncDir(state, h, input.(*FsyncIn))
status = doFsyncDir(me, h, input.(*FsyncIn))
// TODO - implement XAttr routines.
// case FUSE_SETXATTR:
......@@ -396,7 +428,7 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
case FUSE_ACCESS:
status = fs.Access(h, input.(*AccessIn))
case FUSE_CREATE:
out, status = doCreate(state, h, input.(*CreateIn), filename)
out, status = doCreate(me, h, input.(*CreateIn), filename)
// TODO - implement file locking.
// case FUSE_SETLK
......@@ -410,49 +442,52 @@ func dispatch(state *MountState, h *InHeader, arg *bytes.Buffer) (outBytes [][]b
// TODO - figure out how to support this
// case FUSE_INTERRUPT
default:
state.Error(os.NewError(fmt.Sprintf("Unsupported OpCode: %d=%v", h.Opcode, operationName(h.Opcode))))
return serialize(h, ENOSYS, nil, nil, false)
me.Error(os.NewError(fmt.Sprintf("Unsupported OpCode: %d=%v", h.Opcode, operationName(h.Opcode))))
req.status = ENOSYS
serialize(req, me.Debug)
return
}
return serialize(h, status, out, flatData, state.Debug)
req.status = status
req.data = out
serialize(req, me.Debug)
}
func serialize(h *InHeader, res Status, out interface{}, flatData []byte, debug bool) [][]byte {
func serialize(req *fuseRequest, debug bool) {
out_data := make([]byte, 0)
b := new(bytes.Buffer)
if out != nil && res == OK {
err := binary.Write(b, binary.LittleEndian, out)
if req.data != nil && req.status == OK {
err := binary.Write(b, binary.LittleEndian, req.data)
if err == nil {
out_data = b.Bytes()
} else {
panic(fmt.Sprintf("Can't serialize out: %v, err: %v", out, err))
panic(fmt.Sprintf("Can't serialize out: %v, err: %v", req.data, err))
}
}
var hout OutHeader
hout.Unique = h.Unique
hout.Status = -res
hout.Length = uint32(len(out_data) + SizeOfOutHeader + len(flatData))
var hOut OutHeader
hOut.Unique = req.inHeader.Unique
hOut.Status = -req.status
hOut.Length = uint32(len(out_data) + SizeOfOutHeader + len(req.flatData))
b = new(bytes.Buffer)
err := binary.Write(b, binary.LittleEndian, &hout)
err := binary.Write(b, binary.LittleEndian, &hOut)
if err != nil {
panic("Can't serialize OutHeader")
}
data := [][]byte{b.Bytes(), out_data, flatData}
req.serialized = [][]byte{b.Bytes(), out_data, req.flatData}
if debug {
val := fmt.Sprintf("%v", out)
val := fmt.Sprintf("%v", req.data)
max := 1024
if len(val) > max {
val = val[:max] + fmt.Sprintf(" ...trimmed (response size %d)", hout.Length)
val = val[:max] + fmt.Sprintf(" ...trimmed (response size %d)", hOut.Length)
}
log.Printf("Serialize: %v code: %v value: %v flat: %d\n",
operationName(h.Opcode), res, val, len(flatData))
operationName(req.inHeader.Opcode), req.status, val, len(req.flatData))
}
return data
}
func initFuse(state *MountState, h *InHeader, input *InitIn) (Empty, Status) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment