Commit e028a29e authored by Han-Wen Nienhuys's avatar Han-Wen Nienhuys

fuse: interrupt handling

Use linear search to find requests to interrupt. This assumes that
interrupts are rare, and the number of outstanding requests when it
happens is low.

Interrupted requests are not put back into the request pool to avoid
racing with in-progress mutators.
parent 63a16e74
...@@ -10,6 +10,7 @@ import ( ...@@ -10,6 +10,7 @@ import (
"log" "log"
"reflect" "reflect"
"runtime" "runtime"
"time"
"unsafe" "unsafe"
) )
...@@ -458,6 +459,23 @@ func doSetLkw(server *Server, req *request) { ...@@ -458,6 +459,23 @@ func doSetLkw(server *Server, req *request) {
} }
func doInterrupt(server *Server, req *request) { func doInterrupt(server *Server, req *request) {
input := (*InterruptIn)(req.inData)
server.reqMu.Lock()
defer server.reqMu.Unlock()
// This is slow, but this operation is rare.
for _, inflight := range server.reqInflight {
if input.Unique == inflight.inHeader.Unique && !inflight.interrupted {
close(inflight.cancel)
inflight.interrupted = true
req.status = OK
return
}
}
// not found; wait for a bit
time.Sleep(10 * time.Microsecond)
req.status = EAGAIN
} }
//////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////
......
...@@ -17,6 +17,13 @@ var sizeOfOutHeader = unsafe.Sizeof(OutHeader{}) ...@@ -17,6 +17,13 @@ var sizeOfOutHeader = unsafe.Sizeof(OutHeader{})
var zeroOutBuf [outputHeaderSize]byte var zeroOutBuf [outputHeaderSize]byte
type request struct { type request struct {
inflightIndex int
cancel chan struct{}
// written under Server.reqMu
interrupted bool
inputBuf []byte inputBuf []byte
// These split up inputBuf. // These split up inputBuf.
...@@ -144,16 +151,18 @@ func (r *request) setInput(input []byte) bool { ...@@ -144,16 +151,18 @@ func (r *request) setInput(input []byte) bool {
return true return true
} }
func (r *request) parse() { func (r *request) parseHeader() Status {
inHSize := int(unsafe.Sizeof(InHeader{})) if len(r.inputBuf) < int(unsafe.Sizeof(InHeader{})) {
if len(r.inputBuf) < inHSize {
log.Printf("Short read for input header: %v", r.inputBuf) log.Printf("Short read for input header: %v", r.inputBuf)
return return EINVAL
} }
r.inHeader = (*InHeader)(unsafe.Pointer(&r.inputBuf[0])) r.inHeader = (*InHeader)(unsafe.Pointer(&r.inputBuf[0]))
r.arg = r.inputBuf[:] return OK
}
func (r *request) parse() {
r.arg = r.inputBuf[:]
r.handler = getHandler(r.inHeader.Opcode) r.handler = getHandler(r.inHeader.Opcode)
if r.handler == nil { if r.handler == nil {
log.Printf("Unknown opcode %d", r.inHeader.Opcode) log.Printf("Unknown opcode %d", r.inHeader.Opcode)
...@@ -171,7 +180,7 @@ func (r *request) parse() { ...@@ -171,7 +180,7 @@ func (r *request) parse() {
r.inData = unsafe.Pointer(&r.arg[0]) r.inData = unsafe.Pointer(&r.arg[0])
r.arg = r.arg[r.handler.InputSize:] r.arg = r.arg[r.handler.InputSize:]
} else { } else {
r.arg = r.arg[inHSize:] r.arg = r.arg[unsafe.Sizeof(InHeader{}):]
} }
count := r.handler.FileNames count := r.handler.FileNames
...@@ -198,6 +207,7 @@ func (r *request) parse() { ...@@ -198,6 +207,7 @@ func (r *request) parse() {
copy(r.outBuf[:r.handler.OutputSize+sizeOfOutHeader], copy(r.outBuf[:r.handler.OutputSize+sizeOfOutHeader],
zeroOutBuf[:r.handler.OutputSize+sizeOfOutHeader]) zeroOutBuf[:r.handler.OutputSize+sizeOfOutHeader])
} }
func (r *request) outData() unsafe.Pointer { func (r *request) outData() unsafe.Pointer {
......
...@@ -49,6 +49,7 @@ type Server struct { ...@@ -49,6 +49,7 @@ type Server struct {
readPool sync.Pool readPool sync.Pool
reqMu sync.Mutex reqMu sync.Mutex
reqReaders int reqReaders int
reqInflight []*request
kernelSettings InitIn kernelSettings InitIn
// in-flight notify-retrieve queries // in-flight notify-retrieve queries
...@@ -169,7 +170,11 @@ func NewServer(fs RawFileSystem, mountPoint string, opts *MountOptions) (*Server ...@@ -169,7 +170,11 @@ func NewServer(fs RawFileSystem, mountPoint string, opts *MountOptions) (*Server
singleReader: runtime.GOOS == "darwin", singleReader: runtime.GOOS == "darwin",
ready: make(chan error, 1), ready: make(chan error, 1),
} }
ms.reqPool.New = func() interface{} { return new(request) } ms.reqPool.New = func() interface{} {
return &request{
cancel: make(chan struct{}),
}
}
ms.readPool.New = func() interface{} { return make([]byte, o.MaxWrite+pageSize) } ms.readPool.New = func() interface{} { return make([]byte, o.MaxWrite+pageSize) }
mountPoint = filepath.Clean(mountPoint) mountPoint = filepath.Clean(mountPoint)
...@@ -281,6 +286,13 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { ...@@ -281,6 +286,13 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) {
gobbled := req.setInput(dest[:n]) gobbled := req.setInput(dest[:n])
ms.reqMu.Lock() ms.reqMu.Lock()
defer ms.reqMu.Unlock()
// Must parse request.Unique under lock
if status := req.parseHeader(); !status.Ok() {
return nil, status
}
req.inflightIndex = len(ms.reqInflight)
ms.reqInflight = append(ms.reqInflight, req)
if !gobbled { if !gobbled {
ms.readPool.Put(dest) ms.readPool.Put(dest)
dest = nil dest = nil
...@@ -290,14 +302,30 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) { ...@@ -290,14 +302,30 @@ func (ms *Server) readRequest(exitIdle bool) (req *request, code Status) {
ms.loops.Add(1) ms.loops.Add(1)
go ms.loop(true) go ms.loop(true)
} }
ms.reqMu.Unlock()
return req, OK return req, OK
} }
// returnRequest returns a request to the pool of unused requests. // returnRequest returns a request to the pool of unused requests.
func (ms *Server) returnRequest(req *request) { func (ms *Server) returnRequest(req *request) {
ms.reqMu.Lock()
this := req.inflightIndex
last := len(ms.reqInflight) - 1
if last != this {
ms.reqInflight[this] = ms.reqInflight[last]
ms.reqInflight[this].inflightIndex = this
}
ms.reqInflight = ms.reqInflight[:last]
interrupted := req.interrupted
ms.reqMu.Unlock()
ms.recordStats(req) ms.recordStats(req)
if interrupted {
// Don't reposses data, because someone might still
// be looking at it
return
}
if req.bufferPoolOutputBuf != nil { if req.bufferPoolOutputBuf != nil {
ms.buffers.FreeBuffer(req.bufferPoolOutputBuf) ms.buffers.FreeBuffer(req.bufferPoolOutputBuf)
...@@ -455,8 +483,12 @@ func (ms *Server) allocOut(req *request, size uint32) []byte { ...@@ -455,8 +483,12 @@ func (ms *Server) allocOut(req *request, size uint32) []byte {
func (ms *Server) write(req *request) Status { func (ms *Server) write(req *request) Status {
// Forget/NotifyReply do not wait for reply from filesystem server. // Forget/NotifyReply do not wait for reply from filesystem server.
switch req.inHeader.Opcode { switch req.inHeader.Opcode {
case _OP_FORGET, _OP_BATCH_FORGET, _OP_NOTIFY_REPLY, _OP_INTERRUPT: case _OP_FORGET, _OP_BATCH_FORGET, _OP_NOTIFY_REPLY:
return OK return OK
case _OP_INTERRUPT:
if req.status.Ok() {
return OK
}
} }
header := req.serializeHeader(req.flatDataSize()) header := req.serializeHeader(req.flatDataSize())
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment