Commit adb384ad authored by David du Colombier's avatar David du Colombier Committed by Brad Fitzpatrick

net: implement asynchonous cancelable I/O on Plan 9

This change is an experimental implementation of asynchronous
cancelable I/O operations on Plan 9, which are required to
implement deadlines.

There are no asynchronous syscalls on Plan 9. I/O operations
are performed with blocking pread and pwrite syscalls.

Implementing deadlines in Go requires a way to interrupt
I/O operations.

It is possible to interrupt reads and writes on a TCP connection
by forcing the closure of the TCP connection. This approach
has been used successfully in CL 31390.

However, we can't implement deadlines with this method, since
we require to be able to reuse the connection after the timeout.

On Plan 9, I/O operations are interrupted when the process
receives a note. We can rely on this behavior to implement
a more generic approach.

When doing an I/O operation (read or write), we start the I/O in
its own process, then wait for the result asynchronously. The
process is able to handle the "hangup" note. When receiving the
"hangup" note, the currently running I/O operation is canceled
and the process returns.

This way, deadlines can be implemented by sending an "hangup"
note to the process running the blocking I/O operation, after
the expiration of a timer.

Fixes #11932.
Fixes #17498.

Change-Id: I414f72c7a9a4f9b8f9c09ed3b6c269f899d9b430
Reviewed-on: https://go-review.googlesource.com/31521Reviewed-by: default avatarBrad Fitzpatrick <bradfitz@golang.org>
parent 456f2f5c
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package net
import (
"os"
"runtime"
"sync"
"syscall"
)
// asyncIO implements asynchronous cancelable I/O.
// An asyncIO represents a single asynchronous Read or Write
// operation. The result is returned on the result channel.
// The undergoing I/O system call can either complete or be
// interrupted by a note.
type asyncIO struct {
res chan result
// mu guards the pid field.
mu sync.Mutex
// pid holds the process id of
// the process running the IO operation.
pid int
}
// result is the return value of a Read or Write operation.
type result struct {
n int
err error
}
// newAsyncIO returns a new asyncIO that performs an I/O
// operation by calling fn, which must do one and only one
// interruptible system call.
func newAsyncIO(fn func([]byte) (int, error), b []byte) *asyncIO {
aio := &asyncIO{
res: make(chan result, 0),
}
aio.mu.Lock()
go func() {
// Lock the current goroutine to its process
// and store the pid in io so that Cancel can
// interrupt it. We ignore the "hangup" signal,
// so the signal does not take down the entire
// Go runtime.
runtime.LockOSThread()
runtime_ignoreHangup()
aio.pid = os.Getpid()
aio.mu.Unlock()
n, err := fn(b)
aio.mu.Lock()
aio.pid = -1
runtime_unignoreHangup()
aio.mu.Unlock()
aio.res <- result{n, err}
}()
return aio
}
var hangupNote os.Signal = syscall.Note("hangup")
// Cancel interrupts the I/O operation, causing
// the Wait function to return.
func (aio *asyncIO) Cancel() {
aio.mu.Lock()
defer aio.mu.Unlock()
if aio.pid == -1 {
return
}
proc, err := os.FindProcess(aio.pid)
if err != nil {
return
}
proc.Signal(hangupNote)
}
// Wait for the I/O operation to complete.
func (aio *asyncIO) Wait() (int, error) {
res := <-aio.res
return res.n, res.err
}
// The following functions, provided by the runtime, are used to
// ignore and unignore the "hangup" signal received by the process.
func runtime_ignoreHangup()
func runtime_unignoreHangup()
......@@ -7,10 +7,17 @@ package net
import (
"io"
"os"
"sync/atomic"
"syscall"
"time"
)
type atomicBool int32
func (b *atomicBool) isSet() bool { return atomic.LoadInt32((*int32)(b)) != 0 }
func (b *atomicBool) setFalse() { atomic.StoreInt32((*int32)(b), 0) }
func (b *atomicBool) setTrue() { atomic.StoreInt32((*int32)(b), 1) }
// Network file descriptor.
type netFD struct {
// locking/lifetime of sysfd + serialize access to Read and Write methods
......@@ -23,6 +30,14 @@ type netFD struct {
listen, ctl, data *os.File
laddr, raddr Addr
isStream bool
// deadlines
raio *asyncIO
waio *asyncIO
rtimer *time.Timer
wtimer *time.Timer
rtimedout atomicBool // set true when read deadline has been reached
wtimedout atomicBool // set true when write deadline has been reached
}
var (
......@@ -84,6 +99,9 @@ func (fd *netFD) destroy() {
}
func (fd *netFD) Read(b []byte) (n int, err error) {
if fd.rtimedout.isSet() {
return 0, errTimeout
}
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
......@@ -94,10 +112,15 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
if len(b) == 0 {
return 0, nil
}
n, err = fd.data.Read(b)
fd.raio = newAsyncIO(fd.data.Read, b)
n, err = fd.raio.Wait()
fd.raio = nil
if isHangup(err) {
err = io.EOF
}
if isInterrupted(err) {
err = errTimeout
}
if fd.net == "udp" && err == io.EOF {
n = 0
err = nil
......@@ -106,6 +129,9 @@ func (fd *netFD) Read(b []byte) (n int, err error) {
}
func (fd *netFD) Write(b []byte) (n int, err error) {
if fd.wtimedout.isSet() {
return 0, errTimeout
}
if !fd.ok() || fd.data == nil {
return 0, syscall.EINVAL
}
......@@ -113,7 +139,13 @@ func (fd *netFD) Write(b []byte) (n int, err error) {
return 0, err
}
defer fd.writeUnlock()
return fd.data.Write(b)
fd.waio = newAsyncIO(fd.data.Write, b)
n, err = fd.waio.Wait()
fd.waio = nil
if isInterrupted(err) {
err = errTimeout
}
return
}
func (fd *netFD) closeRead() error {
......@@ -185,15 +217,74 @@ func (fd *netFD) file(f *os.File, s string) (*os.File, error) {
}
func (fd *netFD) setDeadline(t time.Time) error {
return syscall.EPLAN9
return setDeadlineImpl(fd, t, 'r'+'w')
}
func (fd *netFD) setReadDeadline(t time.Time) error {
return syscall.EPLAN9
return setDeadlineImpl(fd, t, 'r')
}
func (fd *netFD) setWriteDeadline(t time.Time) error {
return syscall.EPLAN9
return setDeadlineImpl(fd, t, 'w')
}
func setDeadlineImpl(fd *netFD, t time.Time, mode int) error {
d := t.Sub(time.Now())
if mode == 'r' || mode == 'r'+'w' {
fd.rtimedout.setFalse()
}
if mode == 'w' || mode == 'r'+'w' {
fd.wtimedout.setFalse()
}
if t.IsZero() || d < 0 {
// Stop timer
if mode == 'r' || mode == 'r'+'w' {
if fd.rtimer != nil {
fd.rtimer.Stop()
}
fd.rtimer = nil
}
if mode == 'w' || mode == 'r'+'w' {
if fd.wtimer != nil {
fd.wtimer.Stop()
}
fd.wtimer = nil
}
} else {
// Interrupt I/O operation once timer has expired
if mode == 'r' || mode == 'r'+'w' {
fd.rtimer = time.AfterFunc(d, func() {
fd.rtimedout.setTrue()
if fd.raio != nil {
fd.raio.Cancel()
}
})
}
if mode == 'w' || mode == 'r'+'w' {
fd.wtimer = time.AfterFunc(d, func() {
fd.wtimedout.setTrue()
if fd.waio != nil {
fd.waio.Cancel()
}
})
}
}
if !t.IsZero() && d < 0 {
// Interrupt current I/O operation
if mode == 'r' || mode == 'r'+'w' {
fd.rtimedout.setTrue()
if fd.raio != nil {
fd.raio.Cancel()
}
}
if mode == 'w' || mode == 'r'+'w' {
fd.wtimedout.setTrue()
if fd.waio != nil {
fd.waio.Cancel()
}
}
}
return nil
}
func setReadBuffer(fd *netFD, bytes int) error {
......@@ -207,3 +298,7 @@ func setWriteBuffer(fd *netFD, bytes int) error {
func isHangup(err error) bool {
return err != nil && stringsHasSuffix(err.Error(), "Hangup")
}
func isInterrupted(err error) bool {
return err != nil && stringsHasSuffix(err.Error(), "interrupted")
}
......@@ -467,6 +467,11 @@ func TestTCPConcurrentAccept(t *testing.T) {
func TestTCPReadWriteAllocs(t *testing.T) {
switch runtime.GOOS {
case "plan9":
// The implementation of asynchronous cancelable
// I/O on Plan 9 allocates memory.
// See net/fd_io_plan9.go.
t.Skipf("not supported on %s", runtime.GOOS)
case "nacl":
// NaCl needs to allocate pseudo file descriptor
// stuff. See syscall/fd_nacl.go.
......
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package runtime
import (
_ "unsafe"
)
//go:linkname runtime_ignoreHangup net.runtime_ignoreHangup
func runtime_ignoreHangup() {
getg().m.ignoreHangup = true
}
//go:linkname runtime_unignoreHangup net.runtime_unignoreHangup
func runtime_unignoreHangup(sig string) {
getg().m.ignoreHangup = false
}
func ignoredNote(note *byte) bool {
if note == nil {
return false
}
if gostringnocopy(note) != "hangup" {
return false
}
return getg().m.ignoreHangup
}
......@@ -100,6 +100,9 @@ func sighandler(_ureg *ureg, note *byte, gp *g) int {
return _NCONT
}
if flags&_SigNotify != 0 {
if ignoredNote(note) {
return _NCONT
}
if sendNote(note) {
return _NCONT
}
......
......@@ -13,6 +13,7 @@ type mOS struct {
waitsemacount uint32
notesig *int8
errstr *byte
ignoreHangup bool
}
func closefd(fd int32) int32
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment