Commit bf30f7b8 authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Handle EOF when splicing.

parent ce49cdd4
...@@ -139,13 +139,14 @@ func TestReadLarge(t *testing.T) { ...@@ -139,13 +139,14 @@ func TestReadLarge(t *testing.T) {
ts := NewTestCase(t) ts := NewTestCase(t)
defer ts.Cleanup() defer ts.Cleanup()
content := make([]byte, 1024*1024) // Add a bit more to test the splicing at the end.
content := make([]byte, 1024*1024 + 43)
for i := range content { for i := range content {
content[i] = byte(i) content[i] = byte(i)
} }
err := ioutil.WriteFile(ts.origFile, []byte(content), 0644) err := ioutil.WriteFile(ts.origFile, []byte(content), 0644)
CheckSuccess(err) CheckSuccess(err)
back, err := ioutil.ReadFile(ts.mountFile) back, err := ioutil.ReadFile(ts.mountFile)
CheckSuccess(err) CheckSuccess(err)
if bytes.Compare(content, back) != 0 { if bytes.Compare(content, back) != 0 {
......
...@@ -3,6 +3,7 @@ package fuse ...@@ -3,6 +3,7 @@ package fuse
import ( import (
"fmt" "fmt"
"log" "log"
"io"
"os" "os"
"strings" "strings"
"sync" "sync"
...@@ -156,7 +157,7 @@ func (ms *MountState) OperationCounts() map[string]int { ...@@ -156,7 +157,7 @@ func (ms *MountState) OperationCounts() map[string]int {
func (ms *MountState) BufferPoolStats() string { func (ms *MountState) BufferPoolStats() string {
s := ms.buffers.String() s := ms.buffers.String()
var r int var r int
ms.reqMu.Lock() ms.reqMu.Lock()
r = len(ms.readPool) + ms.reqReaders r = len(ms.readPool) + ms.reqReaders
ms.reqMu.Unlock() ms.reqMu.Unlock()
...@@ -279,7 +280,7 @@ func (ms *MountState) loop(exitIdle bool) { ...@@ -279,7 +280,7 @@ func (ms *MountState) loop(exitIdle bool) {
if req == nil { if req == nil {
break exit break exit
} }
case ENOENT: case ENOENT:
continue continue
case ENODEV: case ENODEV:
// unmount // unmount
...@@ -326,7 +327,7 @@ func (ms *MountState) handleRequest(req *request) { ...@@ -326,7 +327,7 @@ func (ms *MountState) handleRequest(req *request) {
func (ms *MountState) AllocOut(req *request, size uint32) []byte { func (ms *MountState) AllocOut(req *request, size uint32) []byte {
if cap(req.bufferPoolOutputBuf) >= int(size) { if cap(req.bufferPoolOutputBuf) >= int(size) {
req.bufferPoolOutputBuf = req.bufferPoolOutputBuf[:size] req.bufferPoolOutputBuf = req.bufferPoolOutputBuf[:size]
return req.bufferPoolOutputBuf return req.bufferPoolOutputBuf
} }
if req.bufferPoolOutputBuf != nil { if req.bufferPoolOutputBuf != nil {
ms.buffers.FreeBuffer(req.bufferPoolOutputBuf) ms.buffers.FreeBuffer(req.bufferPoolOutputBuf)
...@@ -364,6 +365,7 @@ func (ms *MountState) write(req *request) Status { ...@@ -364,6 +365,7 @@ func (ms *MountState) write(req *request) Status {
if err := ms.TrySplice(header, req); err == nil { if err := ms.TrySplice(header, req); err == nil {
return OK return OK
} else { } else {
log.Println("Splice error", err)
buf := ms.AllocOut(req, uint32(req.flatData.FdSize)) buf := ms.AllocOut(req, uint32(req.flatData.FdSize))
req.flatData.Read(buf) req.flatData.Read(buf)
header = req.serializeHeader() header = req.serializeHeader()
...@@ -385,25 +387,45 @@ func (ms *MountState) TrySplice(header []byte, req *request) error { ...@@ -385,25 +387,45 @@ func (ms *MountState) TrySplice(header []byte, req *request) error {
if !finalSplice.Grow(total) { if !finalSplice.Grow(total) {
return fmt.Errorf("splice.Grow failed.") return fmt.Errorf("splice.Grow failed.")
} }
_, err = finalSplice.Write(header) _, err = finalSplice.Write(header)
if err != nil { if err != nil {
return err return err
} }
n, err := finalSplice.LoadFromAt(req.flatData.Fd, req.flatData.FdSize, req.flatData.FdOff) var n int
if req.flatData.FdOff < 0 {
n, err = finalSplice.LoadFrom(req.flatData.Fd, req.flatData.FdSize)
} else {
n, err = finalSplice.LoadFromAt(req.flatData.Fd, req.flatData.FdSize, req.flatData.FdOff)
}
if err == io.EOF || (err == nil && n < req.flatData.FdSize && n > 0) {
discard := make([]byte, len(header))
_, err = finalSplice.Read(discard)
if err != nil {
return err
}
// TODO - fix debug output.
req.flatData.FdSize = n
req.flatData.Fd = finalSplice.ReadFd()
req.flatData.FdOff = -1
header = req.serializeHeader()
return ms.TrySplice(header, req)
}
if err != nil { if err != nil {
// TODO - handle EOF.
// TODO - extract the data from splice. // TODO - extract the data from splice.
return err return err
} }
if n != req.flatData.FdSize { if n != req.flatData.FdSize {
return fmt.Errorf("Size mismatch %d %d", n, req.flatData.FdSize) return fmt.Errorf("splice: wrote %d, want %d", n, req.flatData.FdSize)
} }
_, err = finalSplice.WriteTo(ms.mountFile.Fd(), total) _, err = finalSplice.WriteTo(ms.mountFile.Fd(), total)
if err != nil { if err != nil {
return fmt.Errorf("Splice write %v", err) return fmt.Errorf("splice write: %v", err)
} }
return nil return nil
} }
......
...@@ -12,6 +12,8 @@ type ReadResult struct { ...@@ -12,6 +12,8 @@ type ReadResult struct {
// If Data is nil and Status OK, splice from the following // If Data is nil and Status OK, splice from the following
// file. // file.
Fd uintptr Fd uintptr
// Offset within Fd, or -1 to use current offset.
FdOff int64 FdOff int64
FdSize int FdSize int
} }
...@@ -47,6 +49,6 @@ func (r *ReadResult) Read(buf []byte) Status { ...@@ -47,6 +49,6 @@ func (r *ReadResult) Read(buf []byte) Status {
r.Fd = 0 r.Fd = 0
r.FdOff = 0 r.FdOff = 0
r.FdSize = 0 r.FdSize = 0
return r.Status return r.Status
} }
...@@ -52,6 +52,14 @@ func (p *Pair) Read(d []byte) (n int, err error) { ...@@ -52,6 +52,14 @@ func (p *Pair) Read(d []byte) (n int, err error) {
return p.r.Read(d) return p.r.Read(d)
} }
func (p *Pair) ReadFd() uintptr {
return p.r.Fd()
}
func (p *Pair) WriteFd() uintptr {
return p.w.Fd()
}
func (p *Pair) Write(d []byte) (n int, err error) { func (p *Pair) Write(d []byte) (n int, err error) {
return p.w.Write(d) return p.w.Write(d)
} }
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment