Commit 8bf32fcb authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

Reorganise request and buffer handling in main loop.

Add test for memory pressure. This brings performance back to 50%
slower than libfuse, but simplifies structure for future performance
improvements.
parent 67ba7251
......@@ -5,7 +5,6 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
......@@ -38,12 +37,10 @@ type MountState struct {
opts *MountOptions
kernelSettings raw.InitIn
// Number of loops blocked on reading; used to control amount
// of concurrency.
readers int32
// Number of loops total. Needed for clean shutdown.
loops sync.WaitGroup
reqMu sync.Mutex
reqPool []*request
readPool [][]byte
reqReaders int
}
func (ms *MountState) KernelSettings() raw.InitIn {
......@@ -152,11 +149,67 @@ func (ms *MountState) BufferPoolStats() string {
return ms.buffers.String()
}
func (ms *MountState) newRequest() *request {
r := &request{
pool: ms.buffers,
const _MAX_READERS = 10
func (ms *MountState) readRequest() (req *request, code Status) {
var dest []byte
ms.reqMu.Lock()
l := len(ms.reqPool)
if l > 0 {
req = ms.reqPool[l-1]
ms.reqPool = ms.reqPool[:l-1]
} else {
req = new(request)
}
l = len(ms.readPool)
if l > 0 {
dest = ms.readPool[l-1]
ms.readPool = ms.readPool[:l-1]
} else {
dest = make([]byte, ms.opts.MaxWrite + 4096)
}
return r
ms.reqReaders++
ms.reqMu.Unlock()
n, err := ms.mountFile.Read(dest)
if err != nil {
code = ToStatus(err)
ms.reqMu.Lock()
ms.reqPool = append(ms.reqPool, req)
ms.reqReaders--
ms.reqMu.Unlock()
return nil, code
}
gobbled := req.setInput(dest[:n])
ms.reqMu.Lock()
if !gobbled {
ms.readPool = append(ms.readPool, dest)
dest = nil
}
ms.reqReaders--
ms.reqMu.Unlock()
return req, OK
}
func (ms *MountState) returnRequest(req *request) {
ms.recordStats(req)
if req.bufferPoolOutputBuf != nil {
ms.buffers.FreeBuffer(req.bufferPoolOutputBuf)
req.bufferPoolOutputBuf = nil
}
req.clear()
ms.reqMu.Lock()
if req.bufferPoolOutputBuf != nil {
ms.readPool = append(ms.readPool, req.bufferPoolInputBuf)
req.bufferPoolInputBuf = nil
}
ms.reqPool = append(ms.reqPool, req)
ms.reqMu.Unlock()
}
func (ms *MountState) recordStats(req *request) {
......@@ -178,70 +231,35 @@ func (ms *MountState) recordStats(req *request) {
//
// Each filesystem operation executes in a separate goroutine.
func (ms *MountState) Loop() {
ms.loops.Add(1)
ms.loop()
ms.loops.Wait()
ms.mountFile.Close()
}
const _MAX_READERS = 10
func (ms *MountState) loop() {
var dest []byte
var req *request
exit:
for {
if dest == nil {
dest = ms.buffers.AllocBuffer(uint32(ms.opts.MaxWrite + 4096))
}
if atomic.AddInt32(&ms.readers, 0) > _MAX_READERS {
break
}
atomic.AddInt32(&ms.readers, 1)
n, err := ms.mountFile.Read(dest)
readers := atomic.AddInt32(&ms.readers, -1)
if err != nil {
errNo := ToStatus(err)
// Retry.
if errNo == ENOENT {
req, errNo := ms.readRequest()
switch errNo {
case OK:
case ENOENT:
continue
}
// Unmount.
if errNo == ENODEV {
break
}
case ENODEV:
// unmount
break exit
default: // some other error?
log.Printf("Failed to read from fuse conn: %v", errNo)
break
}
if readers <= 0 {
ms.loops.Add(1)
go ms.loop()
}
if req == nil {
req = ms.newRequest()
}
if ms.latencies != nil {
req.startNs = time.Now().UnixNano()
}
if req.setInput(dest[:n]) {
dest = nil
}
ms.handleRequest(req)
req.clear()
go ms.handleRequest(req)
}
ms.buffers.FreeBuffer(dest)
ms.loops.Done()
}
func (ms *MountState) handleRequest(req *request) {
defer req.Discard()
defer ms.recordStats(req)
defer ms.returnRequest(req)
req.parse()
if req.handler == nil {
......@@ -268,6 +286,15 @@ func (ms *MountState) handleRequest(req *request) {
}
}
func (ms *MountState) AllocOut(req *request, size uint32) []byte {
if req.bufferPoolOutputBuf != nil {
ms.buffers.FreeBuffer(req.bufferPoolOutputBuf)
}
req.bufferPoolOutputBuf = ms.buffers.AllocBuffer(size)
return req.bufferPoolOutputBuf
}
func (ms *MountState) write(req *request) Status {
// Forget does not wait for reply.
if req.inHeader.Opcode == _OP_FORGET || req.inHeader.Opcode == _OP_BATCH_FORGET {
......
......@@ -119,7 +119,7 @@ func doCreate(state *MountState, req *request) {
func doReadDir(state *MountState, req *request) {
in := (*ReadIn)(req.inData)
buf := req.AllocOut(in.Size)
buf := state.AllocOut(req, in.Size)
entries := NewDirEntryList(buf, uint64(in.Offset))
code := state.fileSystem.ReadDir(entries, req.inHeader, in)
......@@ -254,7 +254,7 @@ func doLink(state *MountState, req *request) {
func doRead(state *MountState, req *request) {
in := (*ReadIn)(req.inData)
buf := req.AllocOut(in.Size)
buf := state.AllocOut(req, in.Size)
req.flatData, req.status = state.fileSystem.Read(req.inHeader, in, buf)
}
......
package fuse
import (
"fmt"
"io/ioutil"
"os"
"strings"
"sync"
"testing"
"time"
)
// This test checks that highly concurrent loads don't use a lot of
// memory if it is not needed: The input buffer needs to accomodata
// the max write size, but it is only really needed when we are
// processing writes.
type DelayFs struct {
DefaultFileSystem
}
func (d *DelayFs) GetAttr(name string, c *Context) (*Attr, Status) {
if name == "" || strings.HasSuffix(name, "dir") {
return &Attr{Mode: S_IFDIR | 0755}, OK
}
time.Sleep(time.Second)
return &Attr{Mode: S_IFREG | 0644}, OK
}
func TestMemoryPressure(t *testing.T) {
fs := &DelayFs{}
dir, err := ioutil.TempDir("", "go-fuse")
CheckSuccess(err)
nfs := NewPathNodeFs(fs, nil)
o := &FileSystemOptions{PortableInodes: true}
state, _, err := MountNodeFileSystem(dir, nfs, o)
if err != nil {
t.Fatalf("mount failed: %v", err)
}
go state.Loop()
defer state.Unmount()
state.Debug = VerboseTest()
// Wait for FS to get ready.
os.Lstat(dir)
var wg sync.WaitGroup
for i := 0; i < 10 * _MAX_READERS; i++ {
wg.Add(1)
go func(x int) {
fn := fmt.Sprintf("%s/%ddir/file%d", dir, x, x)
_, err := os.Lstat(fn)
if err != nil {
t.Errorf("parallel stat %q: %v", fn, err)
}
wg.Done()
}(i)
}
time.Sleep(100 * time.Millisecond)
created := state.buffers.createdBuffers
if created > 2 * _MAX_READERS {
t.Errorf("created %d buffers, max reader %d", created, _MAX_READERS)
}
wg.Wait()
}
......@@ -13,19 +13,6 @@ import (
var sizeOfOutHeader = unsafe.Sizeof(raw.OutHeader{})
var zeroOutBuf [160]byte
func (req *request) Discard() {
req.pool.FreeBuffer(req.bufferPoolOutputBuf)
req.pool.FreeBuffer(req.bufferPoolInputBuf)
}
func (req *request) AllocOut(size uint32) []byte {
if req.bufferPoolOutputBuf != nil {
req.pool.FreeBuffer(req.bufferPoolOutputBuf)
}
req.bufferPoolOutputBuf = req.pool.AllocBuffer(size)
return req.pool.AllocBuffer(size)
}
type request struct {
inputBuf []byte
......@@ -52,7 +39,6 @@ type request struct {
// obtained through bufferpool.
bufferPoolInputBuf []byte
bufferPoolOutputBuf []byte
pool BufferPool
// For small pieces of data, we use the following inlines
// arrays:
......@@ -65,7 +51,6 @@ type request struct {
}
func (r *request) clear() {
r.bufferPoolInputBuf = nil
r.inputBuf = nil
r.inHeader = nil
r.inData = nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment