Commit c05b06a1 authored by Ian Lance Taylor's avatar Ian Lance Taylor

os: use poller for file I/O

This changes the os package to use the runtime poller for file I/O
where possible. When a system call blocks on a pollable descriptor,
the goroutine will be blocked on the poller but the thread will be
released to run other goroutines. When using a non-pollable
descriptor, the os package will continue to use thread-blocking system
calls as before.

For example, on GNU/Linux, the runtime poller uses epoll. epoll does
not support ordinary disk files, so they will continue to use blocking
I/O as before. The poller will be used for pipes.

Since this means that the poller is used for many more programs, this
modifies the runtime to only block waiting for the poller if there is
some goroutine that is waiting on the poller. Otherwise, there is no
point, as the poller will never make any goroutine ready. This
preserves the runtime's current simple deadlock detection.

This seems to crash FreeBSD systems, so it is disabled on FreeBSD.
This is issue 19093.

Using the poller on Windows requires opening the file with
FILE_FLAG_OVERLAPPED. We should only do that if we can remove that
flag if the program calls the Fd method. This is issue 19098.

Update #6817.
Update #7903.
Update #15021.
Update #18507.
Update #19093.
Update #19098.

Change-Id: Ia5197dcefa7c6fbcca97d19a6f8621b2abcbb1fe
Reviewed-on: https://go-review.googlesource.com/36800
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent 81ec3f6a
...@@ -85,3 +85,7 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error { ...@@ -85,3 +85,7 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
fd.decref() fd.decref()
return nil return nil
} }
func PollDescriptor() uintptr {
return ^uintptr(0)
}
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
func runtimeNano() int64 func runtimeNano() int64
func runtime_pollServerInit() func runtime_pollServerInit()
func runtime_pollServerDescriptor() uintptr
func runtime_pollOpen(fd uintptr) (uintptr, int) func runtime_pollOpen(fd uintptr) (uintptr, int)
func runtime_pollClose(ctx uintptr) func runtime_pollClose(ctx uintptr)
func runtime_pollWait(ctx uintptr, mode int) int func runtime_pollWait(ctx uintptr, mode int) int
...@@ -146,3 +147,9 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error { ...@@ -146,3 +147,9 @@ func setDeadlineImpl(fd *FD, t time.Time, mode int) error {
fd.decref() fd.decref()
return nil return nil
} }
// PollDescriptor returns the descriptor being used by the poller,
// or ^uintptr(0) if there isn't one. This is only used for testing.
func PollDescriptor() uintptr {
return runtime_pollServerDescriptor()
}
...@@ -365,7 +365,19 @@ func (fd *FD) ReadDirent(buf []byte) (int, error) { ...@@ -365,7 +365,19 @@ func (fd *FD) ReadDirent(buf []byte) (int, error) {
return 0, err return 0, err
} }
defer fd.decref() defer fd.decref()
return syscall.ReadDirent(fd.Sysfd, buf) for {
n, err := syscall.ReadDirent(fd.Sysfd, buf)
if err != nil {
n = 0
if err == syscall.EAGAIN {
if err = fd.pd.waitRead(); err == nil {
continue
}
}
}
// Do not call eofError; caller does not expect to see io.EOF.
return n, err
}
} }
// Fchdir wraps syscall.Fchdir. // Fchdir wraps syscall.Fchdir.
......
...@@ -523,13 +523,15 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) { ...@@ -523,13 +523,15 @@ func (fd *FD) Pread(b []byte, off int64) (int, error) {
var done uint32 var done uint32
e = syscall.ReadFile(fd.Sysfd, b, &done, &o) e = syscall.ReadFile(fd.Sysfd, b, &done, &o)
if e != nil { if e != nil {
done = 0
if e == syscall.ERROR_HANDLE_EOF { if e == syscall.ERROR_HANDLE_EOF {
// end of file e = io.EOF
return 0, nil
} }
return 0, e
} }
return int(done), nil if len(b) != 0 {
e = fd.eofError(int(done), e)
}
return int(done), e
} }
func (fd *FD) RecvFrom(buf []byte) (int, syscall.Sockaddr, error) { func (fd *FD) RecvFrom(buf []byte) (int, syscall.Sockaddr, error) {
......
...@@ -8,6 +8,7 @@ package os ...@@ -8,6 +8,7 @@ package os
import ( import (
"io" "io"
"runtime"
"syscall" "syscall"
) )
...@@ -63,9 +64,10 @@ func (f *File) readdirnames(n int) (names []string, err error) { ...@@ -63,9 +64,10 @@ func (f *File) readdirnames(n int) (names []string, err error) {
if d.bufp >= d.nbuf { if d.bufp >= d.nbuf {
d.bufp = 0 d.bufp = 0
var errno error var errno error
d.nbuf, errno = fixCount(syscall.ReadDirent(f.fd, d.buf)) d.nbuf, errno = f.pfd.ReadDirent(d.buf)
runtime.KeepAlive(f)
if errno != nil { if errno != nil {
return names, NewSyscallError("readdirent", errno) return names, wrapSyscallError("readdirent", errno)
} }
if d.nbuf <= 0 { if d.nbuf <= 0 {
break // EOF break // EOF
......
...@@ -6,6 +6,7 @@ package os ...@@ -6,6 +6,7 @@ package os
import ( import (
"io" "io"
"runtime"
"syscall" "syscall"
) )
...@@ -16,7 +17,7 @@ func (file *File) readdir(n int) (fi []FileInfo, err error) { ...@@ -16,7 +17,7 @@ func (file *File) readdir(n int) (fi []FileInfo, err error) {
if !file.isdir() { if !file.isdir() {
return nil, &PathError{"Readdir", file.name, syscall.ENOTDIR} return nil, &PathError{"Readdir", file.name, syscall.ENOTDIR}
} }
if !file.dirinfo.isempty && file.fd == syscall.InvalidHandle { if !file.dirinfo.isempty && file.pfd.Sysfd == syscall.InvalidHandle {
return nil, syscall.EINVAL return nil, syscall.EINVAL
} }
wantAll := n <= 0 wantAll := n <= 0
...@@ -29,7 +30,8 @@ func (file *File) readdir(n int) (fi []FileInfo, err error) { ...@@ -29,7 +30,8 @@ func (file *File) readdir(n int) (fi []FileInfo, err error) {
d := &file.dirinfo.data d := &file.dirinfo.data
for n != 0 && !file.dirinfo.isempty { for n != 0 && !file.dirinfo.isempty {
if file.dirinfo.needdata { if file.dirinfo.needdata {
e := syscall.FindNextFile(file.fd, d) e := file.pfd.FindNextFile(d)
runtime.KeepAlive(file)
if e != nil { if e != nil {
if e == syscall.ERROR_NO_MORE_FILES { if e == syscall.ERROR_NO_MORE_FILES {
break break
......
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build darwin dragonfly freebsd linux nacl netbsd openbsd solaris windows
package os
import "syscall"
// wrapSyscallError takes an error and a syscall name. If the error is
// a syscall.Errno, it wraps it in a os.SyscallError using the syscall name.
func wrapSyscallError(name string, err error) error {
if _, ok := err.(syscall.Errno); ok {
err = NewSyscallError(name, err)
}
return err
}
...@@ -12,6 +12,7 @@ import ( ...@@ -12,6 +12,7 @@ import (
"bytes" "bytes"
"context" "context"
"fmt" "fmt"
"internal/poll"
"internal/testenv" "internal/testenv"
"io" "io"
"io/ioutil" "io/ioutil"
...@@ -369,12 +370,16 @@ var testedAlreadyLeaked = false ...@@ -369,12 +370,16 @@ var testedAlreadyLeaked = false
// basefds returns the number of expected file descriptors // basefds returns the number of expected file descriptors
// to be present in a process at start. // to be present in a process at start.
// stdin, stdout, stderr, epoll/kqueue
func basefds() uintptr { func basefds() uintptr {
return os.Stderr.Fd() + 1 return os.Stderr.Fd() + 1
} }
func closeUnexpectedFds(t *testing.T, m string) { func closeUnexpectedFds(t *testing.T, m string) {
for fd := basefds(); fd <= 101; fd++ { for fd := basefds(); fd <= 101; fd++ {
if fd == poll.PollDescriptor() {
continue
}
err := os.NewFile(fd, "").Close() err := os.NewFile(fd, "").Close()
if err == nil { if err == nil {
t.Logf("%s: Something already leaked - closed fd %d", m, fd) t.Logf("%s: Something already leaked - closed fd %d", m, fd)
...@@ -732,6 +737,9 @@ func TestHelperProcess(*testing.T) { ...@@ -732,6 +737,9 @@ func TestHelperProcess(*testing.T) {
// Now verify that there are no other open fds. // Now verify that there are no other open fds.
var files []*os.File var files []*os.File
for wantfd := basefds() + 1; wantfd <= 100; wantfd++ { for wantfd := basefds() + 1; wantfd <= 100; wantfd++ {
if wantfd == poll.PollDescriptor() {
continue
}
f, err := os.Open(os.Args[0]) f, err := os.Open(os.Args[0])
if err != nil { if err != nil {
fmt.Printf("error opening file with expected fd %d: %v", wantfd, err) fmt.Printf("error opening file with expected fd %d: %v", wantfd, err)
......
...@@ -9,5 +9,4 @@ package os ...@@ -9,5 +9,4 @@ package os
var ( var (
FixLongPath = fixLongPath FixLongPath = fixLongPath
NewConsoleFile = newConsoleFile NewConsoleFile = newConsoleFile
ReadConsoleFunc = &readConsole
) )
...@@ -99,12 +99,13 @@ func (f *File) Read(b []byte) (n int, err error) { ...@@ -99,12 +99,13 @@ func (f *File) Read(b []byte) (n int, err error) {
return 0, err return 0, err
} }
n, e := f.read(b) n, e := f.read(b)
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
}
if e != nil { if e != nil {
if e == io.EOF {
err = e
} else {
err = &PathError{"read", f.name, e} err = &PathError{"read", f.name, e}
} }
}
return n, err return n, err
} }
...@@ -118,11 +119,12 @@ func (f *File) ReadAt(b []byte, off int64) (n int, err error) { ...@@ -118,11 +119,12 @@ func (f *File) ReadAt(b []byte, off int64) (n int, err error) {
} }
for len(b) > 0 { for len(b) > 0 {
m, e := f.pread(b, off) m, e := f.pread(b, off)
if m == 0 && e == nil {
return n, io.EOF
}
if e != nil { if e != nil {
if e == io.EOF {
err = e
} else {
err = &PathError{"read", f.name, e} err = &PathError{"read", f.name, e}
}
break break
} }
n += m n += m
...@@ -226,19 +228,6 @@ func Chdir(dir string) error { ...@@ -226,19 +228,6 @@ func Chdir(dir string) error {
return nil return nil
} }
// Chdir changes the current working directory to the file,
// which must be a directory.
// If there is an error, it will be of type *PathError.
func (f *File) Chdir() error {
if err := f.checkValid("chdir"); err != nil {
return err
}
if e := syscall.Fchdir(f.fd); e != nil {
return &PathError{"chdir", f.name, e}
}
return nil
}
// Open opens the named file for reading. If successful, methods on // Open opens the named file for reading. If successful, methods on
// the returned file can be used for reading; the associated file // the returned file can be used for reading; the associated file
// descriptor has mode O_RDONLY. // descriptor has mode O_RDONLY.
...@@ -275,15 +264,3 @@ func fixCount(n int, err error) (int, error) { ...@@ -275,15 +264,3 @@ func fixCount(n int, err error) (int, error) {
} }
return n, err return n, err
} }
// checkValid checks whether f is valid for use.
// If not, it returns an appropriate error, perhaps incorporating the operation name op.
func (f *File) checkValid(op string) error {
if f == nil {
return ErrInvalid
}
if f.fd == badFd {
return &PathError{op, f.name, ErrClosed}
}
return nil
}
...@@ -244,14 +244,22 @@ func (f *File) Sync() error { ...@@ -244,14 +244,22 @@ func (f *File) Sync() error {
// read reads up to len(b) bytes from the File. // read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any. // It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) { func (f *File) read(b []byte) (n int, err error) {
return fixCount(syscall.Read(f.fd, b)) n, e := fixCount(syscall.Read(f.fd, b))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
}
return n, e
} }
// pread reads len(b) bytes from the File starting at byte offset off. // pread reads len(b) bytes from the File starting at byte offset off.
// It returns the number of bytes read and the error, if any. // It returns the number of bytes read and the error, if any.
// EOF is signaled by a zero count with err set to nil. // EOF is signaled by a zero count with err set to nil.
func (f *File) pread(b []byte, off int64) (n int, err error) { func (f *File) pread(b []byte, off int64) (n int, err error) {
return fixCount(syscall.Pread(f.fd, b, off)) n, e := fixCount(syscall.Pread(f.fd, b, off))
if n == 0 && len(b) > 0 && e == nil {
return 0, io.EOF
}
return n, e
} }
// write writes len(b) bytes to the File. // write writes len(b) bytes to the File.
...@@ -472,3 +480,28 @@ func (f *File) Chown(uid, gid int) error { ...@@ -472,3 +480,28 @@ func (f *File) Chown(uid, gid int) error {
func TempDir() string { func TempDir() string {
return "/tmp" return "/tmp"
} }
// Chdir changes the current working directory to the file,
// which must be a directory.
// If there is an error, it will be of type *PathError.
func (f *File) Chdir() error {
if err := f.checkValid("chdir"); err != nil {
return err
}
if e := syscall.Fchdir(f.fd); e != nil {
return &PathError{"chdir", f.name, e}
}
return nil
}
// checkValid checks whether f is valid for use.
// If not, it returns an appropriate error, perhaps incorporating the operation name op.
func (f *File) checkValid(op string) error {
if f == nil {
return ErrInvalid
}
if f.fd == badFd {
return &PathError{op, f.name, ErrClosed}
}
return nil
}
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
package os package os
import ( import (
"runtime"
"syscall" "syscall"
"time" "time"
) )
...@@ -60,9 +61,10 @@ func (f *File) Chmod(mode FileMode) error { ...@@ -60,9 +61,10 @@ func (f *File) Chmod(mode FileMode) error {
if err := f.checkValid("chmod"); err != nil { if err := f.checkValid("chmod"); err != nil {
return err return err
} }
if e := syscall.Fchmod(f.fd, syscallMode(mode)); e != nil { if e := f.pfd.Fchmod(syscallMode(mode)); e != nil {
return &PathError{"chmod", f.name, e} return &PathError{"chmod", f.name, e}
} }
runtime.KeepAlive(f)
return nil return nil
} }
...@@ -92,9 +94,10 @@ func (f *File) Chown(uid, gid int) error { ...@@ -92,9 +94,10 @@ func (f *File) Chown(uid, gid int) error {
if err := f.checkValid("chown"); err != nil { if err := f.checkValid("chown"); err != nil {
return err return err
} }
if e := syscall.Fchown(f.fd, uid, gid); e != nil { if e := f.pfd.Fchown(uid, gid); e != nil {
return &PathError{"chown", f.name, e} return &PathError{"chown", f.name, e}
} }
runtime.KeepAlive(f)
return nil return nil
} }
...@@ -105,9 +108,10 @@ func (f *File) Truncate(size int64) error { ...@@ -105,9 +108,10 @@ func (f *File) Truncate(size int64) error {
if err := f.checkValid("truncate"); err != nil { if err := f.checkValid("truncate"); err != nil {
return err return err
} }
if e := syscall.Ftruncate(f.fd, size); e != nil { if e := f.pfd.Ftruncate(size); e != nil {
return &PathError{"truncate", f.name, e} return &PathError{"truncate", f.name, e}
} }
runtime.KeepAlive(f)
return nil return nil
} }
...@@ -118,9 +122,10 @@ func (f *File) Sync() error { ...@@ -118,9 +122,10 @@ func (f *File) Sync() error {
if err := f.checkValid("sync"); err != nil { if err := f.checkValid("sync"); err != nil {
return err return err
} }
if e := syscall.Fsync(f.fd); e != nil { if e := f.pfd.Fsync(); e != nil {
return &PathError{"sync", f.name, e} return &PathError{"sync", f.name, e}
} }
runtime.KeepAlive(f)
return nil return nil
} }
...@@ -139,3 +144,29 @@ func Chtimes(name string, atime time.Time, mtime time.Time) error { ...@@ -139,3 +144,29 @@ func Chtimes(name string, atime time.Time, mtime time.Time) error {
} }
return nil return nil
} }
// Chdir changes the current working directory to the file,
// which must be a directory.
// If there is an error, it will be of type *PathError.
func (f *File) Chdir() error {
if err := f.checkValid("chdir"); err != nil {
return err
}
if e := f.pfd.Fchdir(); e != nil {
return &PathError{"chdir", f.name, e}
}
runtime.KeepAlive(f)
return nil
}
// checkValid checks whether f is valid for use.
// If not, it returns an appropriate error, perhaps incorporating the operation name op.
func (f *File) checkValid(op string) error {
if f == nil {
return ErrInvalid
}
if f.pfd.Sysfd == badFd {
return &PathError{op, f.name, ErrClosed}
}
return nil
}
...@@ -7,6 +7,7 @@ ...@@ -7,6 +7,7 @@
package os package os
import ( import (
"internal/poll"
"runtime" "runtime"
"syscall" "syscall"
) )
...@@ -33,9 +34,10 @@ func rename(oldname, newname string) error { ...@@ -33,9 +34,10 @@ func rename(oldname, newname string) error {
// can overwrite this data, which could cause the finalizer // can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor. // to close the wrong file descriptor.
type file struct { type file struct {
fd int pfd poll.FD
name string name string
dirinfo *dirInfo // nil unless directory being read dirinfo *dirInfo // nil unless directory being read
nonblock bool // whether we set nonblocking mode
} }
// Fd returns the integer Unix file descriptor referencing the open file. // Fd returns the integer Unix file descriptor referencing the open file.
...@@ -44,16 +46,64 @@ func (f *File) Fd() uintptr { ...@@ -44,16 +46,64 @@ func (f *File) Fd() uintptr {
if f == nil { if f == nil {
return ^(uintptr(0)) return ^(uintptr(0))
} }
return uintptr(f.fd)
// If we put the file descriptor into nonblocking mode,
// then set it to blocking mode before we return it,
// because historically we have always returned a descriptor
// opened in blocking mode. The File will continue to work,
// but any blocking operation will tie up a thread.
if f.nonblock {
syscall.SetNonblock(f.pfd.Sysfd, false)
}
return uintptr(f.pfd.Sysfd)
} }
// NewFile returns a new File with the given file descriptor and name. // NewFile returns a new File with the given file descriptor and name.
func NewFile(fd uintptr, name string) *File { func NewFile(fd uintptr, name string) *File {
return newFile(fd, name, false)
}
// newFile is like NewFile, but if pollable is true it tries to add the
// file to the runtime poller.
func newFile(fd uintptr, name string, pollable bool) *File {
fdi := int(fd) fdi := int(fd)
if fdi < 0 { if fdi < 0 {
return nil return nil
} }
f := &File{&file{fd: fdi, name: name}} f := &File{&file{
pfd: poll.FD{
Sysfd: fdi,
IsStream: true,
ZeroReadIsEOF: true,
},
name: name,
}}
// Don't try to use kqueue with regular files on FreeBSD.
// It crashes the system unpredictably while running all.bash.
// Issue 19093.
if runtime.GOOS == "freebsd" {
pollable = false
}
if pollable {
if err := f.pfd.Init(); err != nil {
// An error here indicates a failure to register
// with the netpoll system. That can happen for
// a file descriptor that is not supported by
// epoll/kqueue; for example, disk files on
// GNU/Linux systems. We assume that any real error
// will show up in later I/O.
} else {
// We successfully registered with netpoll, so put
// the file into nonblocking mode.
if err := syscall.SetNonblock(fdi, true); err == nil {
f.nonblock = true
}
}
}
runtime.SetFinalizer(f.file, (*file).close) runtime.SetFinalizer(f.file, (*file).close)
return f return f
} }
...@@ -69,7 +119,7 @@ type dirInfo struct { ...@@ -69,7 +119,7 @@ type dirInfo struct {
// output or standard error. See the SIGPIPE docs in os/signal, and // output or standard error. See the SIGPIPE docs in os/signal, and
// issue 11845. // issue 11845.
func epipecheck(file *File, e error) { func epipecheck(file *File, e error) {
if e == syscall.EPIPE && (file.fd == 1 || file.fd == 2) { if e == syscall.EPIPE && (file.pfd.Sysfd == 1 || file.pfd.Sysfd == 2) {
sigpipe() sigpipe()
} }
} }
...@@ -120,7 +170,7 @@ func OpenFile(name string, flag int, perm FileMode) (*File, error) { ...@@ -120,7 +170,7 @@ func OpenFile(name string, flag int, perm FileMode) (*File, error) {
syscall.CloseOnExec(r) syscall.CloseOnExec(r)
} }
return NewFile(uintptr(r), name), nil return newFile(uintptr(r), name, true), nil
} }
// Close closes the File, rendering it unusable for I/O. // Close closes the File, rendering it unusable for I/O.
...@@ -133,83 +183,51 @@ func (f *File) Close() error { ...@@ -133,83 +183,51 @@ func (f *File) Close() error {
} }
func (file *file) close() error { func (file *file) close() error {
if file == nil || file.fd == badFd { if file == nil || file.pfd.Sysfd == badFd {
return syscall.EINVAL return syscall.EINVAL
} }
var err error var err error
if e := syscall.Close(file.fd); e != nil { if e := file.pfd.Close(); e != nil {
err = &PathError{"close", file.name, e} err = &PathError{"close", file.name, e}
} }
file.fd = -1 // so it can't be closed again file.pfd.Sysfd = badFd // so it can't be closed again
// no need for a finalizer anymore // no need for a finalizer anymore
runtime.SetFinalizer(file, nil) runtime.SetFinalizer(file, nil)
return err return err
} }
// Darwin and FreeBSD can't read or write 2GB+ at a time,
// even on 64-bit systems. See golang.org/issue/7812.
// Use 1GB instead of, say, 2GB-1, to keep subsequent
// reads aligned.
const (
needsMaxRW = runtime.GOOS == "darwin" || runtime.GOOS == "freebsd"
maxRW = 1 << 30
)
// read reads up to len(b) bytes from the File. // read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any. // It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) { func (f *File) read(b []byte) (n int, err error) {
if needsMaxRW && len(b) > maxRW { n, err = f.pfd.Read(b)
b = b[:maxRW] runtime.KeepAlive(f)
} return n, err
return fixCount(syscall.Read(f.fd, b))
} }
// pread reads len(b) bytes from the File starting at byte offset off. // pread reads len(b) bytes from the File starting at byte offset off.
// It returns the number of bytes read and the error, if any. // It returns the number of bytes read and the error, if any.
// EOF is signaled by a zero count with err set to nil. // EOF is signaled by a zero count with err set to nil.
func (f *File) pread(b []byte, off int64) (n int, err error) { func (f *File) pread(b []byte, off int64) (n int, err error) {
if needsMaxRW && len(b) > maxRW { n, err = f.pfd.Pread(b, off)
b = b[:maxRW] runtime.KeepAlive(f)
} return n, err
return fixCount(syscall.Pread(f.fd, b, off))
} }
// write writes len(b) bytes to the File. // write writes len(b) bytes to the File.
// It returns the number of bytes written and an error, if any. // It returns the number of bytes written and an error, if any.
func (f *File) write(b []byte) (n int, err error) { func (f *File) write(b []byte) (n int, err error) {
for { n, err = f.pfd.Write(b)
bcap := b runtime.KeepAlive(f)
if needsMaxRW && len(bcap) > maxRW {
bcap = bcap[:maxRW]
}
m, err := fixCount(syscall.Write(f.fd, bcap))
n += m
// If the syscall wrote some data but not all (short write)
// or it returned EINTR, then assume it stopped early for
// reasons that are uninteresting to the caller, and try again.
if 0 < m && m < len(bcap) || err == syscall.EINTR {
b = b[m:]
continue
}
if needsMaxRW && len(bcap) != len(b) && err == nil {
b = b[m:]
continue
}
return n, err return n, err
}
} }
// pwrite writes len(b) bytes to the File starting at byte offset off. // pwrite writes len(b) bytes to the File starting at byte offset off.
// It returns the number of bytes written and an error, if any. // It returns the number of bytes written and an error, if any.
func (f *File) pwrite(b []byte, off int64) (n int, err error) { func (f *File) pwrite(b []byte, off int64) (n int, err error) {
if needsMaxRW && len(b) > maxRW { n, err = f.pfd.Pwrite(b, off)
b = b[:maxRW] runtime.KeepAlive(f)
} return n, err
return fixCount(syscall.Pwrite(f.fd, b, off))
} }
// seek sets the offset for the next Read or Write on file to offset, interpreted // seek sets the offset for the next Read or Write on file to offset, interpreted
...@@ -217,7 +235,9 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) { ...@@ -217,7 +235,9 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) {
// relative to the current offset, and 2 means relative to the end. // relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any. // It returns the new offset and an error, if any.
func (f *File) seek(offset int64, whence int) (ret int64, err error) { func (f *File) seek(offset int64, whence int) (ret int64, err error) {
return syscall.Seek(f.fd, offset, whence) ret, err = f.pfd.Seek(offset, whence)
runtime.KeepAlive(f)
return ret, err
} }
// Truncate changes the size of the named file. // Truncate changes the size of the named file.
......
...@@ -5,13 +5,11 @@ ...@@ -5,13 +5,11 @@
package os package os
import ( import (
"internal/poll"
"internal/syscall/windows" "internal/syscall/windows"
"io"
"runtime" "runtime"
"sync"
"syscall" "syscall"
"unicode/utf16" "unicode/utf16"
"unicode/utf8"
"unsafe" "unsafe"
) )
...@@ -20,17 +18,9 @@ import ( ...@@ -20,17 +18,9 @@ import (
// can overwrite this data, which could cause the finalizer // can overwrite this data, which could cause the finalizer
// to close the wrong file descriptor. // to close the wrong file descriptor.
type file struct { type file struct {
fd syscall.Handle pfd poll.FD
name string name string
dirinfo *dirInfo // nil unless directory being read dirinfo *dirInfo // nil unless directory being read
l sync.Mutex // used to implement windows pread/pwrite
// only for console io
isConsole bool
lastbits []byte // first few bytes of the last incomplete rune in last write
readuint16 []uint16 // buffer to hold uint16s obtained with ReadConsole
readbyte []byte // buffer to hold decoding of readuint16 from utf16 to utf8
readbyteOffset int // readbyte[readOffset:] is yet to be consumed with file.Read
} }
// Fd returns the Windows handle referencing the open file. // Fd returns the Windows handle referencing the open file.
...@@ -39,22 +29,39 @@ func (file *File) Fd() uintptr { ...@@ -39,22 +29,39 @@ func (file *File) Fd() uintptr {
if file == nil { if file == nil {
return uintptr(syscall.InvalidHandle) return uintptr(syscall.InvalidHandle)
} }
return uintptr(file.fd) return uintptr(file.pfd.Sysfd)
} }
// newFile returns a new File with the given file handle and name. // newFile returns a new File with the given file handle and name.
// Unlike NewFile, it does not check that h is syscall.InvalidHandle. // Unlike NewFile, it does not check that h is syscall.InvalidHandle.
func newFile(h syscall.Handle, name string) *File { func newFile(h syscall.Handle, name string, kind string) *File {
f := &File{&file{fd: h, name: name}} if kind == "file" {
var m uint32
if syscall.GetConsoleMode(h, &m) == nil {
kind = "console"
}
}
f := &File{&file{
pfd: poll.FD{
Sysfd: h,
IsStream: true,
ZeroReadIsEOF: true,
},
name: name,
}}
runtime.SetFinalizer(f.file, (*file).close) runtime.SetFinalizer(f.file, (*file).close)
// Ignore initialization errors.
// Assume any problems will show up in later I/O.
f.pfd.Init(kind)
return f return f
} }
// newConsoleFile creates new File that will be used as console. // newConsoleFile creates new File that will be used as console.
func newConsoleFile(h syscall.Handle, name string) *File { func newConsoleFile(h syscall.Handle, name string) *File {
f := newFile(h, name) return newFile(h, name, "console")
f.isConsole = true
return f
} }
// NewFile returns a new File with the given file descriptor and name. // NewFile returns a new File with the given file descriptor and name.
...@@ -63,11 +70,7 @@ func NewFile(fd uintptr, name string) *File { ...@@ -63,11 +70,7 @@ func NewFile(fd uintptr, name string) *File {
if h == syscall.InvalidHandle { if h == syscall.InvalidHandle {
return nil return nil
} }
var m uint32 return newFile(h, name, "file")
if syscall.GetConsoleMode(h, &m) == nil {
return newConsoleFile(h, name)
}
return newFile(h, name)
} }
// Auxiliary information if the File describes a directory // Auxiliary information if the File describes a directory
...@@ -90,7 +93,7 @@ func openFile(name string, flag int, perm FileMode) (file *File, err error) { ...@@ -90,7 +93,7 @@ func openFile(name string, flag int, perm FileMode) (file *File, err error) {
if e != nil { if e != nil {
return nil, e return nil, e
} }
return NewFile(uintptr(r), name), nil return newFile(r, name, "file"), nil
} }
func openDir(name string) (file *File, err error) { func openDir(name string) (file *File, err error) {
...@@ -137,7 +140,7 @@ func openDir(name string) (file *File, err error) { ...@@ -137,7 +140,7 @@ func openDir(name string) (file *File, err error) {
return nil, e return nil, e
} }
} }
f := newFile(r, name) f := newFile(r, name, "dir")
f.dirinfo = d f.dirinfo = d
return f, nil return f, nil
} }
...@@ -176,220 +179,55 @@ func (file *File) Close() error { ...@@ -176,220 +179,55 @@ func (file *File) Close() error {
} }
func (file *file) close() error { func (file *file) close() error {
if file == nil { if file == nil || file.pfd.Sysfd == badFd {
return syscall.EINVAL return syscall.EINVAL
} }
if file.isdir() && file.dirinfo.isempty { if file.isdir() && file.dirinfo.isempty {
// "special" empty directories // "special" empty directories
return nil return nil
} }
if file.fd == syscall.InvalidHandle {
return syscall.EINVAL
}
var e error
if file.isdir() {
e = syscall.FindClose(file.fd)
} else {
e = syscall.CloseHandle(file.fd)
}
var err error var err error
if e != nil { if e := file.pfd.Close(); e != nil {
err = &PathError{"close", file.name, e} err = &PathError{"close", file.name, e}
} }
file.fd = badFd // so it can't be closed again file.pfd.Sysfd = badFd // so it can't be closed again
// no need for a finalizer anymore // no need for a finalizer anymore
runtime.SetFinalizer(file, nil) runtime.SetFinalizer(file, nil)
return err return err
} }
var readConsole = syscall.ReadConsole // changed for testing
// readConsole reads utf16 characters from console File,
// encodes them into utf8 and stores them in buffer b.
// It returns the number of utf8 bytes read and an error, if any.
func (f *File) readConsole(b []byte) (n int, err error) {
if len(b) == 0 {
return 0, nil
}
if f.readuint16 == nil {
// Note: syscall.ReadConsole fails for very large buffers.
// The limit is somewhere around (but not exactly) 16384.
// Stay well below.
f.readuint16 = make([]uint16, 0, 10000)
f.readbyte = make([]byte, 0, 4*cap(f.readuint16))
}
for f.readbyteOffset >= len(f.readbyte) {
n := cap(f.readuint16) - len(f.readuint16)
if n > len(b) {
n = len(b)
}
var nw uint32
err := readConsole(f.fd, &f.readuint16[:len(f.readuint16)+1][len(f.readuint16)], uint32(n), &nw, nil)
if err != nil {
return 0, err
}
uint16s := f.readuint16[:len(f.readuint16)+int(nw)]
f.readuint16 = f.readuint16[:0]
buf := f.readbyte[:0]
for i := 0; i < len(uint16s); i++ {
r := rune(uint16s[i])
if utf16.IsSurrogate(r) {
if i+1 == len(uint16s) {
if nw > 0 {
// Save half surrogate pair for next time.
f.readuint16 = f.readuint16[:1]
f.readuint16[0] = uint16(r)
break
}
r = utf8.RuneError
} else {
r = utf16.DecodeRune(r, rune(uint16s[i+1]))
if r != utf8.RuneError {
i++
}
}
}
n := utf8.EncodeRune(buf[len(buf):cap(buf)], r)
buf = buf[:len(buf)+n]
}
f.readbyte = buf
f.readbyteOffset = 0
if nw == 0 {
break
}
}
src := f.readbyte[f.readbyteOffset:]
var i int
for i = 0; i < len(src) && i < len(b); i++ {
x := src[i]
if x == 0x1A { // Ctrl-Z
if i == 0 {
f.readbyteOffset++
}
break
}
b[i] = x
}
f.readbyteOffset += i
return i, nil
}
// read reads up to len(b) bytes from the File. // read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any. // It returns the number of bytes read and an error, if any.
func (f *File) read(b []byte) (n int, err error) { func (f *File) read(b []byte) (n int, err error) {
f.l.Lock() n, err = f.pfd.Read(b)
defer f.l.Unlock() runtime.KeepAlive(f)
if f.isConsole { return n, err
return f.readConsole(b)
}
return fixCount(syscall.Read(f.fd, b))
} }
// pread reads len(b) bytes from the File starting at byte offset off. // pread reads len(b) bytes from the File starting at byte offset off.
// It returns the number of bytes read and the error, if any. // It returns the number of bytes read and the error, if any.
// EOF is signaled by a zero count with err set to 0. // EOF is signaled by a zero count with err set to 0.
func (f *File) pread(b []byte, off int64) (n int, err error) { func (f *File) pread(b []byte, off int64) (n int, err error) {
f.l.Lock() n, err = f.pfd.Pread(b, off)
defer f.l.Unlock() runtime.KeepAlive(f)
curoffset, e := syscall.Seek(f.fd, 0, io.SeekCurrent) return n, err
if e != nil {
return 0, e
}
defer syscall.Seek(f.fd, curoffset, io.SeekStart)
o := syscall.Overlapped{
OffsetHigh: uint32(off >> 32),
Offset: uint32(off),
}
var done uint32
e = syscall.ReadFile(f.fd, b, &done, &o)
if e != nil {
if e == syscall.ERROR_HANDLE_EOF {
// end of file
return 0, nil
}
return 0, e
}
return int(done), nil
}
// writeConsole writes len(b) bytes to the console File.
// It returns the number of bytes written and an error, if any.
func (f *File) writeConsole(b []byte) (n int, err error) {
n = len(b)
runes := make([]rune, 0, 256)
if len(f.lastbits) > 0 {
b = append(f.lastbits, b...)
f.lastbits = nil
}
for len(b) >= utf8.UTFMax || utf8.FullRune(b) {
r, l := utf8.DecodeRune(b)
runes = append(runes, r)
b = b[l:]
}
if len(b) > 0 {
f.lastbits = make([]byte, len(b))
copy(f.lastbits, b)
}
// syscall.WriteConsole seems to fail, if given large buffer.
// So limit the buffer to 16000 characters. This number was
// discovered by experimenting with syscall.WriteConsole.
const maxWrite = 16000
for len(runes) > 0 {
m := len(runes)
if m > maxWrite {
m = maxWrite
}
chunk := runes[:m]
runes = runes[m:]
uint16s := utf16.Encode(chunk)
for len(uint16s) > 0 {
var written uint32
err = syscall.WriteConsole(f.fd, &uint16s[0], uint32(len(uint16s)), &written, nil)
if err != nil {
return 0, nil
}
uint16s = uint16s[written:]
}
}
return n, nil
} }
// write writes len(b) bytes to the File. // write writes len(b) bytes to the File.
// It returns the number of bytes written and an error, if any. // It returns the number of bytes written and an error, if any.
func (f *File) write(b []byte) (n int, err error) { func (f *File) write(b []byte) (n int, err error) {
f.l.Lock() n, err = f.pfd.Write(b)
defer f.l.Unlock() runtime.KeepAlive(f)
if f.isConsole { return n, err
return f.writeConsole(b)
}
return fixCount(syscall.Write(f.fd, b))
} }
// pwrite writes len(b) bytes to the File starting at byte offset off. // pwrite writes len(b) bytes to the File starting at byte offset off.
// It returns the number of bytes written and an error, if any. // It returns the number of bytes written and an error, if any.
func (f *File) pwrite(b []byte, off int64) (n int, err error) { func (f *File) pwrite(b []byte, off int64) (n int, err error) {
f.l.Lock() n, err = f.pfd.Pwrite(b, off)
defer f.l.Unlock() runtime.KeepAlive(f)
curoffset, e := syscall.Seek(f.fd, 0, io.SeekCurrent) return n, err
if e != nil {
return 0, e
}
defer syscall.Seek(f.fd, curoffset, io.SeekStart)
o := syscall.Overlapped{
OffsetHigh: uint32(off >> 32),
Offset: uint32(off),
}
var done uint32
e = syscall.WriteFile(f.fd, b, &done, &o)
if e != nil {
return 0, e
}
return int(done), nil
} }
// seek sets the offset for the next Read or Write on file to offset, interpreted // seek sets the offset for the next Read or Write on file to offset, interpreted
...@@ -397,9 +235,9 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) { ...@@ -397,9 +235,9 @@ func (f *File) pwrite(b []byte, off int64) (n int, err error) {
// relative to the current offset, and 2 means relative to the end. // relative to the current offset, and 2 means relative to the end.
// It returns the new offset and an error, if any. // It returns the new offset and an error, if any.
func (f *File) seek(offset int64, whence int) (ret int64, err error) { func (f *File) seek(offset int64, whence int) (ret int64, err error) {
f.l.Lock() ret, err = f.pfd.Seek(offset, whence)
defer f.l.Unlock() runtime.KeepAlive(f)
return syscall.Seek(f.fd, offset, whence) return ret, err
} }
// Truncate changes the size of the named file. // Truncate changes the size of the named file.
...@@ -480,7 +318,7 @@ func Pipe() (r *File, w *File, err error) { ...@@ -480,7 +318,7 @@ func Pipe() (r *File, w *File, err error) {
syscall.CloseOnExec(p[1]) syscall.CloseOnExec(p[1])
syscall.ForkLock.RUnlock() syscall.ForkLock.RUnlock()
return NewFile(uintptr(p[0]), "|0"), NewFile(uintptr(p[1]), "|1"), nil return newFile(p[0], "|0", "file"), newFile(p[1], "|1", "file"), nil
} }
// TempDir returns the default directory to use for temporary files. // TempDir returns the default directory to use for temporary files.
......
...@@ -17,6 +17,7 @@ import ( ...@@ -17,6 +17,7 @@ import (
"path/filepath" "path/filepath"
"reflect" "reflect"
"runtime" "runtime"
"runtime/debug"
"sort" "sort"
"strings" "strings"
"sync" "sync"
...@@ -112,7 +113,7 @@ func size(name string, t *testing.T) int64 { ...@@ -112,7 +113,7 @@ func size(name string, t *testing.T) int64 {
break break
} }
if e != nil { if e != nil {
t.Fatal("read failed:", err) t.Fatal("read failed:", e)
} }
} }
return int64(len) return int64(len)
...@@ -1940,3 +1941,75 @@ func TestRemoveAllRace(t *testing.T) { ...@@ -1940,3 +1941,75 @@ func TestRemoveAllRace(t *testing.T) {
close(hold) // let workers race to remove root close(hold) // let workers race to remove root
wg.Wait() wg.Wait()
} }
// Test that reading from a pipe doesn't use up a thread.
func TestPipeThreads(t *testing.T) {
switch runtime.GOOS {
case "freebsd":
t.Skip("skipping on FreeBSD; issue 19093")
case "windows":
t.Skip("skipping on Windows; issue 19098")
}
threads := 100
// OpenBSD has a low default for max number of files.
if runtime.GOOS == "openbsd" {
threads = 50
}
r := make([]*File, threads)
w := make([]*File, threads)
for i := 0; i < threads; i++ {
rp, wp, err := Pipe()
if err != nil {
for j := 0; j < i; j++ {
r[j].Close()
w[j].Close()
}
t.Fatal(err)
}
r[i] = rp
w[i] = wp
}
defer debug.SetMaxThreads(debug.SetMaxThreads(threads / 2))
var wg sync.WaitGroup
wg.Add(threads)
c := make(chan bool, threads)
for i := 0; i < threads; i++ {
go func(i int) {
defer wg.Done()
var b [1]byte
c <- true
if _, err := r[i].Read(b[:]); err != nil {
t.Error(err)
}
}(i)
}
for i := 0; i < threads; i++ {
<-c
}
// If we are still alive, it means that the 100 goroutines did
// not require 100 threads.
for i := 0; i < threads; i++ {
if _, err := w[i].Write([]byte{0}); err != nil {
t.Error(err)
}
if err := w[i].Close(); err != nil {
t.Error(err)
}
}
wg.Wait()
for i := 0; i < threads; i++ {
if err := r[i].Close(); err != nil {
t.Error(err)
}
}
}
...@@ -6,6 +6,7 @@ package os_test ...@@ -6,6 +6,7 @@ package os_test
import ( import (
"fmt" "fmt"
"internal/poll"
"internal/syscall/windows" "internal/syscall/windows"
"internal/testenv" "internal/testenv"
"io" "io"
...@@ -643,9 +644,9 @@ func TestStatSymlinkLoop(t *testing.T) { ...@@ -643,9 +644,9 @@ func TestStatSymlinkLoop(t *testing.T) {
} }
func TestReadStdin(t *testing.T) { func TestReadStdin(t *testing.T) {
old := *os.ReadConsoleFunc old := poll.ReadConsole
defer func() { defer func() {
*os.ReadConsoleFunc = old poll.ReadConsole = old
}() }()
testConsole := os.NewConsoleFile(syscall.Stdin, "test") testConsole := os.NewConsoleFile(syscall.Stdin, "test")
...@@ -664,7 +665,7 @@ func TestReadStdin(t *testing.T) { ...@@ -664,7 +665,7 @@ func TestReadStdin(t *testing.T) {
for _, s := range tests { for _, s := range tests {
t.Run(fmt.Sprintf("c%d/r%d/%s", consoleSize, readSize, s), func(t *testing.T) { t.Run(fmt.Sprintf("c%d/r%d/%s", consoleSize, readSize, s), func(t *testing.T) {
s16 := utf16.Encode([]rune(s)) s16 := utf16.Encode([]rune(s))
*os.ReadConsoleFunc = func(h syscall.Handle, buf *uint16, toread uint32, read *uint32, inputControl *byte) error { poll.ReadConsole = func(h syscall.Handle, buf *uint16, toread uint32, read *uint32, inputControl *byte) error {
if inputControl != nil { if inputControl != nil {
t.Fatalf("inputControl not nil") t.Fatalf("inputControl not nil")
} }
......
...@@ -24,5 +24,5 @@ func Pipe() (r *File, w *File, err error) { ...@@ -24,5 +24,5 @@ func Pipe() (r *File, w *File, err error) {
syscall.CloseOnExec(p[1]) syscall.CloseOnExec(p[1])
syscall.ForkLock.RUnlock() syscall.ForkLock.RUnlock()
return NewFile(uintptr(p[0]), "|0"), NewFile(uintptr(p[1]), "|1"), nil return newFile(uintptr(p[0]), "|0", true), newFile(uintptr(p[1]), "|1", true), nil
} }
...@@ -29,5 +29,5 @@ func Pipe() (r *File, w *File, err error) { ...@@ -29,5 +29,5 @@ func Pipe() (r *File, w *File, err error) {
return nil, nil, NewSyscallError("pipe2", e) return nil, nil, NewSyscallError("pipe2", e)
} }
return NewFile(uintptr(p[0]), "|0"), NewFile(uintptr(p[1]), "|1"), nil return newFile(uintptr(p[0]), "|0", true), newFile(uintptr(p[1]), "|1", true), nil
} }
...@@ -17,7 +17,7 @@ func (f *File) Stat() (FileInfo, error) { ...@@ -17,7 +17,7 @@ func (f *File) Stat() (FileInfo, error) {
return nil, ErrInvalid return nil, ErrInvalid
} }
var fs fileStat var fs fileStat
err := syscall.Fstat(f.fd, &fs.sys) err := f.pfd.Fstat(&fs.sys)
if err != nil { if err != nil {
return nil, &PathError{"stat", f.name, err} return nil, &PathError{"stat", f.name, err}
} }
......
...@@ -16,7 +16,7 @@ func (file *File) Stat() (FileInfo, error) { ...@@ -16,7 +16,7 @@ func (file *File) Stat() (FileInfo, error) {
if file == nil { if file == nil {
return nil, ErrInvalid return nil, ErrInvalid
} }
if file == nil || file.fd < 0 { if file == nil || file.pfd.Sysfd < 0 {
return nil, syscall.EINVAL return nil, syscall.EINVAL
} }
if file.isdir() { if file.isdir() {
...@@ -27,7 +27,7 @@ func (file *File) Stat() (FileInfo, error) { ...@@ -27,7 +27,7 @@ func (file *File) Stat() (FileInfo, error) {
return &devNullStat, nil return &devNullStat, nil
} }
ft, err := syscall.GetFileType(file.fd) ft, err := file.pfd.GetFileType()
if err != nil { if err != nil {
return nil, &PathError{"GetFileType", file.name, err} return nil, &PathError{"GetFileType", file.name, err}
} }
...@@ -37,7 +37,7 @@ func (file *File) Stat() (FileInfo, error) { ...@@ -37,7 +37,7 @@ func (file *File) Stat() (FileInfo, error) {
} }
var d syscall.ByHandleFileInformation var d syscall.ByHandleFileInformation
err = syscall.GetFileInformationByHandle(file.fd, &d) err = file.pfd.GetFileInformationByHandle(&d)
if err != nil { if err != nil {
return nil, &PathError{"GetFileInformationByHandle", file.name, err} return nil, &PathError{"GetFileInformationByHandle", file.name, err}
} }
......
...@@ -79,6 +79,7 @@ type pollCache struct { ...@@ -79,6 +79,7 @@ type pollCache struct {
var ( var (
netpollInited uint32 netpollInited uint32
pollcache pollCache pollcache pollCache
netpollWaiters uint32
) )
//go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit //go:linkname poll_runtime_pollServerInit internal/poll.runtime_pollServerInit
...@@ -91,6 +92,14 @@ func netpollinited() bool { ...@@ -91,6 +92,14 @@ func netpollinited() bool {
return atomic.Load(&netpollInited) != 0 return atomic.Load(&netpollInited) != 0
} }
//go:linkname poll_runtime_pollServerDescriptor internal/poll.runtime_pollServerDescriptor
// poll_runtime_pollServerDescriptor returns the descriptor being used,
// or ^uintptr(0) if the system does not use a poll descriptor.
func poll_runtime_pollServerDescriptor() uintptr {
return netpolldescriptor()
}
//go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen //go:linkname poll_runtime_pollOpen internal/poll.runtime_pollOpen
func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) { func poll_runtime_pollOpen(fd uintptr) (*pollDesc, int) {
pd := pollcache.alloc() pd := pollcache.alloc()
...@@ -244,10 +253,10 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) { ...@@ -244,10 +253,10 @@ func poll_runtime_pollSetDeadline(pd *pollDesc, d int64, mode int) {
} }
unlock(&pd.lock) unlock(&pd.lock)
if rg != nil { if rg != nil {
goready(rg, 3) netpollgoready(rg, 3)
} }
if wg != nil { if wg != nil {
goready(wg, 3) netpollgoready(wg, 3)
} }
} }
...@@ -273,10 +282,10 @@ func poll_runtime_pollUnblock(pd *pollDesc) { ...@@ -273,10 +282,10 @@ func poll_runtime_pollUnblock(pd *pollDesc) {
} }
unlock(&pd.lock) unlock(&pd.lock)
if rg != nil { if rg != nil {
goready(rg, 3) netpollgoready(rg, 3)
} }
if wg != nil { if wg != nil {
goready(wg, 3) netpollgoready(wg, 3)
} }
} }
...@@ -312,7 +321,19 @@ func netpollcheckerr(pd *pollDesc, mode int32) int { ...@@ -312,7 +321,19 @@ func netpollcheckerr(pd *pollDesc, mode int32) int {
} }
func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool { func netpollblockcommit(gp *g, gpp unsafe.Pointer) bool {
return atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp))) r := atomic.Casuintptr((*uintptr)(gpp), pdWait, uintptr(unsafe.Pointer(gp)))
if r {
// Bump the count of goroutines waiting for the poller.
// The scheduler uses this to decide whether to block
// waiting for the poller if there is nothing else to do.
atomic.Xadd(&netpollWaiters, 1)
}
return r
}
func netpollgoready(gp *g, traceskip int) {
atomic.Xadd(&netpollWaiters, -1)
goready(gp, traceskip+1)
} }
// returns true if IO is ready, or false if timedout or closed // returns true if IO is ready, or false if timedout or closed
...@@ -410,10 +431,10 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) { ...@@ -410,10 +431,10 @@ func netpolldeadlineimpl(pd *pollDesc, seq uintptr, read, write bool) {
} }
unlock(&pd.lock) unlock(&pd.lock)
if rg != nil { if rg != nil {
goready(rg, 0) netpollgoready(rg, 0)
} }
if wg != nil { if wg != nil {
goready(wg, 0) netpollgoready(wg, 0)
} }
} }
......
...@@ -36,6 +36,10 @@ func netpollinit() { ...@@ -36,6 +36,10 @@ func netpollinit() {
throw("netpollinit: failed to create descriptor") throw("netpollinit: failed to create descriptor")
} }
func netpolldescriptor() uintptr {
return uintptr(epfd)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
var ev epollevent var ev epollevent
ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET ev.events = _EPOLLIN | _EPOLLOUT | _EPOLLRDHUP | _EPOLLET
......
...@@ -29,6 +29,10 @@ func netpollinit() { ...@@ -29,6 +29,10 @@ func netpollinit() {
closeonexec(kq) closeonexec(kq)
} }
func netpolldescriptor() uintptr {
return uintptr(kq)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
// Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR) // Arm both EVFILT_READ and EVFILT_WRITE in edge-triggered mode (EV_CLEAR)
// for the whole fd lifetime. The notifications are automatically unregistered // for the whole fd lifetime. The notifications are automatically unregistered
......
...@@ -10,6 +10,10 @@ package runtime ...@@ -10,6 +10,10 @@ package runtime
func netpollinit() { func netpollinit() {
} }
func netpolldescriptor() uintptr {
return ^uintptr(0)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
return 0 return 0
} }
......
...@@ -121,6 +121,10 @@ func netpollinit() { ...@@ -121,6 +121,10 @@ func netpollinit() {
throw("netpollinit: failed to create port") throw("netpollinit: failed to create port")
} }
func netpolldescriptor() uintptr {
return uintptr(portfd)
}
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
lock(&pd.lock) lock(&pd.lock)
// We don't register for any specific type of events yet, that's // We don't register for any specific type of events yet, that's
......
...@@ -6,6 +6,8 @@ ...@@ -6,6 +6,8 @@
package runtime package runtime
var netpollWaiters uint32
// Polls for ready network connections. // Polls for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
func netpoll(block bool) (gp *g) { func netpoll(block bool) (gp *g) {
......
...@@ -41,6 +41,10 @@ func netpollinit() { ...@@ -41,6 +41,10 @@ func netpollinit() {
} }
} }
func netpolldescriptor() uintptr {
return iocphandle
}
func netpollopen(fd uintptr, pd *pollDesc) int32 { func netpollopen(fd uintptr, pd *pollDesc) int32 {
if stdcall4(_CreateIoCompletionPort, fd, iocphandle, 0, 0) == 0 { if stdcall4(_CreateIoCompletionPort, fd, iocphandle, 0, 0) == 0 {
return -int32(getlasterror()) return -int32(getlasterror())
......
...@@ -2060,7 +2060,7 @@ stop: ...@@ -2060,7 +2060,7 @@ stop:
} }
// poll network // poll network
if netpollinited() && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 { if _g_.m.p != 0 {
throw("findrunnable: netpoll with p") throw("findrunnable: netpoll with p")
} }
...@@ -2101,7 +2101,7 @@ func pollWork() bool { ...@@ -2101,7 +2101,7 @@ func pollWork() bool {
if !runqempty(p) { if !runqempty(p) {
return true return true
} }
if netpollinited() && sched.lastpoll != 0 { if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 {
if gp := netpoll(false); gp != nil { if gp := netpoll(false); gp != nil {
injectglist(gp) injectglist(gp)
return true return true
......
...@@ -240,6 +240,7 @@ func TestTraceSymbolize(t *testing.T) { ...@@ -240,6 +240,7 @@ func TestTraceSymbolize(t *testing.T) {
{trace.EvGoSysCall, []frame{ {trace.EvGoSysCall, []frame{
{"syscall.read", 0}, {"syscall.read", 0},
{"syscall.Read", 0}, {"syscall.Read", 0},
{"internal/poll.(*FD).Read", 0},
{"os.(*File).read", 0}, {"os.(*File).read", 0},
{"os.(*File).Read", 0}, {"os.(*File).Read", 0},
{"runtime/trace_test.TestTraceSymbolize.func11", 102}, {"runtime/trace_test.TestTraceSymbolize.func11", 102},
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment