Commit ed738ad3 authored by Mikio Hara's avatar Mikio Hara

net: remove obsolete builtin network pollster

Update #5199
Update #6146

R=golang-dev, dvyukov
CC=golang-dev
https://golang.org/cl/13112044
parent 9b65dac4
// Copyright 2009 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build ignore
// Waiting for FDs via kqueue/kevent.
package net
import (
"os"
"syscall"
)
type pollster struct {
kq int
eventbuf [10]syscall.Kevent_t
events []syscall.Kevent_t
// An event buffer for AddFD/DelFD.
// Must hold pollServer lock.
kbuf [1]syscall.Kevent_t
}
func newpollster() (p *pollster, err error) {
p = new(pollster)
if p.kq, err = syscall.Kqueue(); err != nil {
return nil, os.NewSyscallError("kqueue", err)
}
syscall.CloseOnExec(p.kq)
p.events = p.eventbuf[0:0]
return p, nil
}
// First return value is whether the pollServer should be woken up.
// This version always returns false.
func (p *pollster) AddFD(fd int, mode int, repeat bool) (bool, error) {
// pollServer is locked.
var kmode int
if mode == 'r' {
kmode = syscall.EVFILT_READ
} else {
kmode = syscall.EVFILT_WRITE
}
ev := &p.kbuf[0]
// EV_ADD - add event to kqueue list
// EV_ONESHOT - delete the event the first time it triggers
flags := syscall.EV_ADD
if !repeat {
flags |= syscall.EV_ONESHOT
}
syscall.SetKevent(ev, fd, kmode, flags)
n, err := syscall.Kevent(p.kq, p.kbuf[:], nil, nil)
if err != nil {
return false, os.NewSyscallError("kevent", err)
}
if n != 1 || (ev.Flags&syscall.EV_ERROR) == 0 || int(ev.Ident) != fd || int(ev.Filter) != kmode {
return false, os.NewSyscallError("kqueue phase error", err)
}
if ev.Data != 0 {
return false, syscall.Errno(int(ev.Data))
}
return false, nil
}
// Return value is whether the pollServer should be woken up.
// This version always returns false.
func (p *pollster) DelFD(fd int, mode int) bool {
// pollServer is locked.
var kmode int
if mode == 'r' {
kmode = syscall.EVFILT_READ
} else {
kmode = syscall.EVFILT_WRITE
}
ev := &p.kbuf[0]
// EV_DELETE - delete event from kqueue list
syscall.SetKevent(ev, fd, kmode, syscall.EV_DELETE)
syscall.Kevent(p.kq, p.kbuf[:], nil, nil)
return false
}
func (p *pollster) WaitFD(s *pollServer, nsec int64) (fd int, mode int, err error) {
var t *syscall.Timespec
for len(p.events) == 0 {
if nsec > 0 {
if t == nil {
t = new(syscall.Timespec)
}
*t = syscall.NsecToTimespec(nsec)
}
s.Unlock()
n, err := syscall.Kevent(p.kq, nil, p.eventbuf[:], t)
s.Lock()
if err != nil {
if err == syscall.EINTR {
continue
}
return -1, 0, os.NewSyscallError("kevent", err)
}
if n == 0 {
return -1, 0, nil
}
p.events = p.eventbuf[:n]
}
ev := &p.events[0]
p.events = p.events[1:]
fd = int(ev.Ident)
if ev.Filter == syscall.EVFILT_READ {
mode = 'r'
} else {
mode = 'w'
}
return fd, mode, nil
}
func (p *pollster) Close() error { return os.NewSyscallError("close", syscall.Close(p.kq)) }
// Copyright 2013 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build ignore
package net
import (
"os"
"runtime"
"sync"
"syscall"
"time"
)
// A pollServer helps FDs determine when to retry a non-blocking
// read or write after they get EAGAIN. When an FD needs to wait,
// call s.WaitRead() or s.WaitWrite() to pass the request to the poll server.
// When the pollServer finds that i/o on FD should be possible
// again, it will send on fd.cr/fd.cw to wake any waiting goroutines.
//
// To avoid races in closing, all fd operations are locked and
// refcounted. when netFD.Close() is called, it calls syscall.Shutdown
// and sets a closing flag. Only when the last reference is removed
// will the fd be closed.
type pollServer struct {
pr, pw *os.File
poll *pollster // low-level OS hooks
sync.Mutex // controls pending and deadline
pending map[int]*pollDesc
deadline int64 // next deadline (nsec since 1970)
}
// A pollDesc contains netFD state related to pollServer.
type pollDesc struct {
// immutable after Init()
pollServer *pollServer
sysfd int
cr, cw chan error
// mutable, protected by pollServer mutex
closing bool
ncr, ncw int
// mutable, safe for concurrent access
rdeadline, wdeadline deadline
}
func newPollServer() (s *pollServer, err error) {
s = new(pollServer)
if s.pr, s.pw, err = os.Pipe(); err != nil {
return nil, err
}
if err = syscall.SetNonblock(int(s.pr.Fd()), true); err != nil {
goto Errno
}
if err = syscall.SetNonblock(int(s.pw.Fd()), true); err != nil {
goto Errno
}
if s.poll, err = newpollster(); err != nil {
goto Error
}
if _, err = s.poll.AddFD(int(s.pr.Fd()), 'r', true); err != nil {
s.poll.Close()
goto Error
}
s.pending = make(map[int]*pollDesc)
go s.Run()
return s, nil
Errno:
err = &os.PathError{
Op: "setnonblock",
Path: s.pr.Name(),
Err: err,
}
Error:
s.pr.Close()
s.pw.Close()
return nil, err
}
func (s *pollServer) AddFD(pd *pollDesc, mode int) error {
s.Lock()
intfd := pd.sysfd
if intfd < 0 || pd.closing {
// fd closed underfoot
s.Unlock()
return errClosing
}
var t int64
key := intfd << 1
if mode == 'r' {
pd.ncr++
t = pd.rdeadline.value()
} else {
pd.ncw++
key++
t = pd.wdeadline.value()
}
s.pending[key] = pd
doWakeup := false
if t > 0 && (s.deadline == 0 || t < s.deadline) {
s.deadline = t
doWakeup = true
}
wake, err := s.poll.AddFD(intfd, mode, false)
s.Unlock()
if err != nil {
return err
}
if wake || doWakeup {
s.Wakeup()
}
return nil
}
// Evict evicts pd from the pending list, unblocking
// any I/O running on pd. The caller must have locked
// pollserver.
// Return value is whether the pollServer should be woken up.
func (s *pollServer) Evict(pd *pollDesc) bool {
pd.closing = true
doWakeup := false
if s.pending[pd.sysfd<<1] == pd {
s.WakeFD(pd, 'r', errClosing)
if s.poll.DelFD(pd.sysfd, 'r') {
doWakeup = true
}
delete(s.pending, pd.sysfd<<1)
}
if s.pending[pd.sysfd<<1|1] == pd {
s.WakeFD(pd, 'w', errClosing)
if s.poll.DelFD(pd.sysfd, 'w') {
doWakeup = true
}
delete(s.pending, pd.sysfd<<1|1)
}
return doWakeup
}
var wakeupbuf [1]byte
func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
func (s *pollServer) LookupFD(fd int, mode int) *pollDesc {
key := fd << 1
if mode == 'w' {
key++
}
netfd, ok := s.pending[key]
if !ok {
return nil
}
delete(s.pending, key)
return netfd
}
func (s *pollServer) WakeFD(pd *pollDesc, mode int, err error) {
if mode == 'r' {
for pd.ncr > 0 {
pd.ncr--
pd.cr <- err
}
} else {
for pd.ncw > 0 {
pd.ncw--
pd.cw <- err
}
}
}
func (s *pollServer) CheckDeadlines() {
now := time.Now().UnixNano()
// TODO(rsc): This will need to be handled more efficiently,
// probably with a heap indexed by wakeup time.
var nextDeadline int64
for key, pd := range s.pending {
var t int64
var mode int
if key&1 == 0 {
mode = 'r'
} else {
mode = 'w'
}
if mode == 'r' {
t = pd.rdeadline.value()
} else {
t = pd.wdeadline.value()
}
if t > 0 {
if t <= now {
delete(s.pending, key)
s.poll.DelFD(pd.sysfd, mode)
s.WakeFD(pd, mode, errTimeout)
} else if nextDeadline == 0 || t < nextDeadline {
nextDeadline = t
}
}
}
s.deadline = nextDeadline
}
func (s *pollServer) Run() {
var scratch [100]byte
s.Lock()
defer s.Unlock()
for {
var timeout int64 // nsec to wait for or 0 for none
if s.deadline > 0 {
timeout = s.deadline - time.Now().UnixNano()
if timeout <= 0 {
s.CheckDeadlines()
continue
}
}
fd, mode, err := s.poll.WaitFD(s, timeout)
if err != nil {
print("pollServer WaitFD: ", err.Error(), "\n")
return
}
if fd < 0 {
// Timeout happened.
s.CheckDeadlines()
continue
}
if fd == int(s.pr.Fd()) {
// Drain our wakeup pipe (we could loop here,
// but it's unlikely that there are more than
// len(scratch) wakeup calls).
s.pr.Read(scratch[0:])
s.CheckDeadlines()
} else {
pd := s.LookupFD(fd, mode)
if pd == nil {
// This can happen because the WaitFD runs without
// holding s's lock, so there might be a pending wakeup
// for an fd that has been evicted. No harm done.
continue
}
s.WakeFD(pd, mode, nil)
}
}
}
func (pd *pollDesc) Close() {
}
func (pd *pollDesc) Lock() {
if pd.pollServer == nil {
return
}
pd.pollServer.Lock()
}
func (pd *pollDesc) Unlock() {
if pd.pollServer == nil {
return
}
pd.pollServer.Unlock()
}
func (pd *pollDesc) Wakeup() {
if pd.pollServer == nil {
return
}
pd.pollServer.Wakeup()
}
func (pd *pollDesc) PrepareRead() error {
if pd.rdeadline.expired() {
return errTimeout
}
return nil
}
func (pd *pollDesc) PrepareWrite() error {
if pd.wdeadline.expired() {
return errTimeout
}
return nil
}
func (pd *pollDesc) WaitRead() error {
err := pd.pollServer.AddFD(pd, 'r')
if err == nil {
err = <-pd.cr
}
return err
}
func (pd *pollDesc) WaitWrite() error {
err := pd.pollServer.AddFD(pd, 'w')
if err == nil {
err = <-pd.cw
}
return err
}
func (pd *pollDesc) Evict() bool {
if pd.pollServer == nil {
return false
}
return pd.pollServer.Evict(pd)
}
// Spread network FDs over several pollServers.
var pollMaxN int
var pollservers []*pollServer
var startServersOnce []func()
func init() {
pollMaxN = runtime.NumCPU()
if pollMaxN > 8 {
pollMaxN = 8 // No improvement then.
}
pollservers = make([]*pollServer, pollMaxN)
startServersOnce = make([]func(), pollMaxN)
for i := 0; i < pollMaxN; i++ {
k := i
once := new(sync.Once)
startServersOnce[i] = func() { once.Do(func() { startServer(k) }) }
}
}
func startServer(k int) {
p, err := newPollServer()
if err != nil {
panic(err)
}
pollservers[k] = p
}
func (pd *pollDesc) Init(fd *netFD) error {
pollN := runtime.GOMAXPROCS(0)
if pollN > pollMaxN {
pollN = pollMaxN
}
k := fd.sysfd % pollN
startServersOnce[k]()
pd.sysfd = fd.sysfd
pd.pollServer = pollservers[k]
pd.cr = make(chan error, 1)
pd.cw = make(chan error, 1)
return nil
}
func (fd *netFD) setDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, true, true)
}
func (fd *netFD) setReadDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, true, false)
}
func (fd *netFD) setWriteDeadline(t time.Time) error {
return setDeadlineImpl(fd, t, false, true)
}
func setDeadlineImpl(fd *netFD, t time.Time, read, write bool) error {
if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
if read {
fd.pd.rdeadline.setTime(t)
}
if write {
fd.pd.wdeadline.setTime(t)
}
fd.pd.Wakeup()
return nil
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment