fuse.go 7.79 KB
Newer Older
1 2 3
// Code that handles the control loop, and en/decoding messages
// to/from the kernel.  Dispatches calls into RawFileSystem.

Ivan Krasin's avatar
Ivan Krasin committed
4 5 6 7
package fuse

import (
	"fmt"
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
8
	"log"
Ivan Krasin's avatar
Ivan Krasin committed
9
	"os"
10
	"reflect"
11
	"strings"
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
12
	"syscall"
13
	"time"
14
	"unsafe"
15 16
)

17
// TODO make generic option setting.
Ivan Krasin's avatar
Ivan Krasin committed
18
const (
19 20
	// bufSize should be a power of two to minimize lossage in
	// BufferPool.
21
	bufSize = (1 << 16)
22
	maxRead = bufSize - PAGESIZE
Ivan Krasin's avatar
Ivan Krasin committed
23 24
)

25
type request struct {
26 27 28
	inputBuf []byte

	// These split up inputBuf.
29 30 31
	inHeader *InHeader      // generic header
	inData   unsafe.Pointer // per op data
	arg      []byte         // flat data.
32

33
	// Unstructured data, a pointer to the relevant XxxxOut struct.
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
34
	outData  unsafe.Pointer
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
35
	status   Status
36 37
	flatData []byte

38
	// Header + structured data for what we send back to the kernel.
39
	// May be followed by flatData.
40
	outHeaderBytes []byte
41 42

	// Start timestamp for timing info.
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
43
	startNs    int64
44
	preWriteNs int64
45 46
}

47 48 49 50 51 52 53 54
func (me *request) filename() string {
	return strings.TrimRight(string(me.arg), "\x00")
}

func (me *request) filenames(count int) []string {
	return strings.Split(string(me.arg), "\x00", count)
}

55 56 57

////////////////////////////////////////////////////////////////
// State related to this mount point.
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
58 59 60 61 62 63
type MountState struct {
	// Empty if unmounted.
	mountPoint string
	fileSystem RawFileSystem

	// I/O with kernel and daemon.
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
64
	mountFile *os.File
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
65

66
	// Dump debug info onto stdout.
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
67
	Debug bool
68

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
69
	// For efficient reads and writes.
70
	buffers *BufferPool
71

72
	*LatencyMap
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
73 74 75
}

// Mount filesystem on mountPoint.
76
func (me *MountState) Mount(mountPoint string) os.Error {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
77
	file, mp, err := mount(mountPoint)
78
	if err != nil {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
79
		return err
80
	}
81 82
	me.mountPoint = mp
	me.mountFile = file
83 84
	return nil
}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
85

86 87 88 89 90 91 92 93
func (me *MountState) SetRecordStatistics(record bool) {
	if record {
		me.LatencyMap = NewLatencyMap()
	} else {
		me.LatencyMap = nil
	}
}

94
func (me *MountState) Unmount() os.Error {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
95
	// Todo: flush/release all files/dirs?
96
	result := unmount(me.mountPoint)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
97
	if result == nil {
98
		me.mountPoint = ""
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
99 100 101 102
	}
	return result
}

103
func (me *MountState) Write(req *request) {
104
	if me.LatencyMap != nil {
105 106
		req.preWriteNs = time.Nanoseconds()
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
107

108
	if req.outHeaderBytes == nil {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
109 110
		return
	}
111

112 113
	var err os.Error
	if req.flatData == nil {
114
		_, err = me.mountFile.Write(req.outHeaderBytes)
115 116
	} else {
		_, err = Writev(me.mountFile.Fd(),
117
			[][]byte{req.outHeaderBytes, req.flatData})
118 119
	}

120
	if err != nil {
121 122
		log.Printf("writer: Write/Writev %v failed, err: %v. Opcode: %v",
			req.outHeaderBytes, err, operationName(req.inHeader.Opcode))
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
123 124 125 126
	}
}

func NewMountState(fs RawFileSystem) *MountState {
127 128 129 130 131
	me := new(MountState)
	me.mountPoint = ""
	me.fileSystem = fs
	me.buffers = NewBufferPool()
	return me
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
132 133
}

134
func (me *MountState) Latencies() map[string]float64 {
135
	return me.LatencyMap.Latencies(1e-3)
136 137
}

138 139
func (me *MountState) OperationCounts() map[string]int {
	return me.LatencyMap.Counts()
140 141
}

142
func (me *MountState) BufferPoolStats() string {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
143
	return me.buffers.String()
144 145
}

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
146 147 148
////////////////////////////////////////////////////////////////
// Logic for the control loop.

149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
func (me *MountState) newRequest(oldReq *request) *request {
	if oldReq != nil {
		me.buffers.FreeBuffer(oldReq.flatData)

		*oldReq = request{
		status: OK,
		inputBuf: oldReq.inputBuf[0:bufSize],
		}
		return oldReq
	} 
		
	return &request{
		status: OK,
		inputBuf: me.buffers.AllocBuffer(bufSize),
	}
164 165
}

166
func (me *MountState) readRequest(req *request) os.Error {
167
	n, err := me.mountFile.Read(req.inputBuf)
168 169
	// If we start timing before the read, we may take into
	// account waiting for input into the timing.
170
	if me.LatencyMap != nil {
171 172
		req.startNs = time.Nanoseconds()
	}
173
	req.inputBuf = req.inputBuf[0:n]
174
	return err
175 176
}

177
func (me *MountState) discardRequest(req *request) {
178
	if me.LatencyMap != nil {
179 180 181
		endNs := time.Nanoseconds()
		dt := endNs - req.startNs

182
		opname := operationName(req.inHeader.Opcode)
183 184
		me.LatencyMap.AddMany(
			[]LatencyArg{
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
185 186
				{opname, "", dt},
				{opname + "-write", "", endNs - req.preWriteNs}})
187
	}
188 189
}

190 191 192
// Normally, callers should run Loop() and wait for FUSE to exit, but
// tests will want to run this in a goroutine.
//
193
// If threaded is given, each filesystem operation executes in a
194 195
// separate goroutine.
func (me *MountState) Loop(threaded bool) {
196 197 198 199 200 201 202 203 204 205 206 207 208
	// To limit scheduling overhead, we spawn multiple read loops.
	// This means that the request once read does not need to be
	// assigned to another thread, so it avoids a context switch.
	if threaded {
		for i := 0; i < _BACKGROUND_TASKS; i++ {
			go me.loop()
		}
	}
	me.loop()
	me.mountFile.Close()
}

func (me *MountState) loop() {
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
209
	// See fuse_kern_chan_receive()
210
	var lastReq *request
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
211
	for {
212 213
		req := me.newRequest(lastReq)
		lastReq = req
214
		err := me.readRequest(req)
215
		if err != nil {
216
			errNo := OsErrorToErrno(err)
217
 
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
218 219
			// Retry.
			if errNo == syscall.ENOENT {
220
				me.discardRequest(req)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
221 222 223 224 225
				continue
			}

			// According to fuse_chan_receive()
			if errNo == syscall.ENODEV {
226
				break
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
227 228 229 230
			}

			// What I see on linux-x86 2.6.35.10.
			if errNo == syscall.ENOSYS {
231
				break
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
232
			}
233

234
			log.Printf("Failed to read from fuse conn: %v", err)
235 236
			break
		}
237
		me.handle(req)
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
238
	}
239 240
}

241

242
func (me *MountState) chopMessage(req *request) *operationHandler {
243 244
	inHSize := unsafe.Sizeof(InHeader{})
	if len(req.inputBuf) < inHSize {
245
		log.Printf("Short read for input header: %v", req.inputBuf)
246
		return nil
247
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
248

249 250
	req.inHeader = (*InHeader)(unsafe.Pointer(&req.inputBuf[0]))
	req.arg = req.inputBuf[inHSize:]
251

252 253
	handler := getHandler(req.inHeader.Opcode)
	if handler == nil || handler.Func == nil {
254
		log.Printf("Unknown opcode %v", req.inHeader.Opcode)
255
		req.status = ENOSYS
256
		return handler
257
	}
258

259
	if len(req.arg) < handler.InputSize {
260
		log.Printf("Short read for %v: %v", req.inHeader.Opcode, req.arg)
261
		req.status = EIO
262
		return handler
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
263 264
	}

265
	if handler.InputSize > 0 {
266
		req.inData = unsafe.Pointer(&req.arg[0])
267
		req.arg = req.arg[handler.InputSize:]
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
268
	}
269
	return handler
270 271 272 273
}

func (me *MountState) handle(req *request) {
	defer me.discardRequest(req)
274 275 276
	handler := me.chopMessage(req)

	if handler == nil {
277 278
		return
	}
279

280
	if req.status == OK {
281
		me.dispatch(req, handler)
282
	}
Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
283

284 285 286 287
	// If we try to write OK, nil, we will get
	// error:  writer: Writev [[16 0 0 0 0 0 0 0 17 0 0 0 0 0 0 0]]
	// failed, err: writev: no such file or directory
	if req.inHeader.Opcode != FUSE_FORGET {
288
		serialize(req, handler, me.Debug)
289 290 291 292
		me.Write(req)
	}
}

293
func (me *MountState) dispatch(req *request, handler *operationHandler) {
294
	if me.Debug {
295 296 297 298 299 300 301 302 303
		handler := getHandler(req.inHeader.Opcode)
		var names interface{}
		if handler.FileNames > 0 {
			names = req.filenames(handler.FileNames)
		} else {
			names = ""
		}
		log.Printf("Dispatch: %v, NodeId: %v %v\n",
			operationName(req.inHeader.Opcode), req.inHeader.NodeId, names)
304
	}
305
	handler.Func(me, req)
306 307 308 309 310 311
}

// Thanks to Andrew Gerrand for this hack.
func asSlice(ptr unsafe.Pointer, byteCount int) []byte {
	h := &reflect.SliceHeader{uintptr(ptr), byteCount, byteCount}
	return *(*[]byte)(unsafe.Pointer(h))
312 313
}

314 315
func serialize(req *request, handler *operationHandler, debug bool) {
	dataLength := handler.OutputSize
316
	if req.outData == nil || req.status != OK {
317
		dataLength = 0
318
	}
319

320
	sizeOfOutHeader := unsafe.Sizeof(OutHeader{})
321

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
322
	req.outHeaderBytes = make([]byte, sizeOfOutHeader+dataLength)
323 324 325 326
	outHeader := (*OutHeader)(unsafe.Pointer(&req.outHeaderBytes[0]))
	outHeader.Unique = req.inHeader.Unique
	outHeader.Status = -req.status
	outHeader.Length = uint32(sizeOfOutHeader + dataLength + len(req.flatData))
327

328
	copy(req.outHeaderBytes[sizeOfOutHeader:], asSlice(req.outData, dataLength))
329
	if debug {
330
		val := fmt.Sprintf("%v", replyString(req.inHeader.Opcode, req.outData))
331 332
		max := 1024
		if len(val) > max {
333
			val = val[:max] + fmt.Sprintf(" ...trimmed (response size %d)", outHeader.Length)
334
		}
335

Han-Wen Nienhuys's avatar
Han-Wen Nienhuys committed
336 337 338 339 340 341
		msg := ""
		if len(req.flatData) > 0 {
			msg = fmt.Sprintf(" flat: %d\n", len(req.flatData))
		}
		log.Printf("Serialize: %v code: %v value: %v%v",
			operationName(req.inHeader.Opcode), req.status, val, msg)
342
	}
343
}