Commit 831e3cfa authored by Ian Lance Taylor's avatar Ian Lance Taylor

runtime: change netpoll to take an amount of time to block

This new facility will be used by future CLs in this series.

Change the only blocking call to netpoll to do the right thing when
netpoll returns an empty list.

Updates #6239
Updates #27707

Change-Id: I58b3c2903eda61a3698b1a4729ed0e81382bb1ed
Reviewed-on: https://go-review.googlesource.com/c/go/+/171821
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarMichael Knyszek <mknyszek@google.com>
Reviewed-by: default avatarEmmanuel Odeke <emm.odeke@gmail.com>
parent 6da300b1
...@@ -8,6 +8,7 @@ const ( ...@@ -8,6 +8,7 @@ const (
_EBADF = 0x9 _EBADF = 0x9
_EFAULT = 0xe _EFAULT = 0xe
_EAGAIN = 0xb _EAGAIN = 0xb
_ETIME = 0x3e
_ETIMEDOUT = 0x91 _ETIMEDOUT = 0x91
_EWOULDBLOCK = 0xb _EWOULDBLOCK = 0xb
_EINPROGRESS = 0x96 _EINPROGRESS = 0x96
......
...@@ -38,6 +38,7 @@ const ( ...@@ -38,6 +38,7 @@ const (
EBADF = C.EBADF EBADF = C.EBADF
EFAULT = C.EFAULT EFAULT = C.EFAULT
EAGAIN = C.EAGAIN EAGAIN = C.EAGAIN
ETIME = C.ETIME
ETIMEDOUT = C.ETIMEDOUT ETIMEDOUT = C.ETIMEDOUT
EWOULDBLOCK = C.EWOULDBLOCK EWOULDBLOCK = C.EWOULDBLOCK
EINPROGRESS = C.EINPROGRESS EINPROGRESS = C.EINPROGRESS
......
...@@ -148,12 +148,27 @@ func netpollarm(pd *pollDesc, mode int) { ...@@ -148,12 +148,27 @@ func netpollarm(pd *pollDesc, mode int) {
unlock(&mtxset) unlock(&mtxset)
} }
// netpoll checks for ready network connections.
// Returns list of goroutines that become runnable.
// delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
//go:nowritebarrierrec //go:nowritebarrierrec
func netpoll(block bool) gList { func netpoll(delay int64) gList {
timeout := ^uintptr(0) var timeout uintptr
if !block { if delay < 0 {
timeout = 0 timeout = ^uintptr(0)
} else if delay == 0 {
// TODO: call poll with timeout == 0
return gList{} return gList{}
} else if delay < 1e6 {
timeout = 1
} else if delay < 1e15 {
timeout = uintptr(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
timeout = 1e9
} }
retry: retry:
lock(&mtxpoll) lock(&mtxpoll)
...@@ -168,6 +183,11 @@ retry: ...@@ -168,6 +183,11 @@ retry:
throw("poll failed") throw("poll failed")
} }
unlock(&mtxset) unlock(&mtxset)
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if timeout > 0 {
return gList{}
}
goto retry goto retry
} }
// Check if some descriptors need to be changed // Check if some descriptors need to be changed
...@@ -203,8 +223,5 @@ retry: ...@@ -203,8 +223,5 @@ retry:
} }
} }
unlock(&mtxset) unlock(&mtxset)
if block && toRun.empty() {
goto retry
}
return toRun return toRun
} }
...@@ -56,15 +56,28 @@ func netpollarm(pd *pollDesc, mode int) { ...@@ -56,15 +56,28 @@ func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused") throw("runtime: unused")
} }
// polls for ready network connections // netpoll checks for ready network connections.
// returns list of goroutines that become runnable // Returns list of goroutines that become runnable.
func netpoll(block bool) gList { // delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if epfd == -1 { if epfd == -1 {
return gList{} return gList{}
} }
waitms := int32(-1) var waitms int32
if !block { if delay < 0 {
waitms = -1
} else if delay == 0 {
waitms = 0 waitms = 0
} else if delay < 1e6 {
waitms = 1
} else if delay < 1e15 {
waitms = int32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
waitms = 1e9
} }
var events [128]epollevent var events [128]epollevent
retry: retry:
...@@ -74,6 +87,11 @@ retry: ...@@ -74,6 +87,11 @@ retry:
println("runtime: epollwait on fd", epfd, "failed with", -n) println("runtime: epollwait on fd", epfd, "failed with", -n)
throw("runtime: netpoll failed") throw("runtime: netpoll failed")
} }
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if waitms > 0 {
return gList{}
}
goto retry goto retry
} }
var toRun gList var toRun gList
...@@ -98,8 +116,5 @@ retry: ...@@ -98,8 +116,5 @@ retry:
netpollready(&toRun, pd, mode) netpollready(&toRun, pd, mode)
} }
} }
if block && toRun.empty() {
goto retry
}
return toRun return toRun
} }
...@@ -27,6 +27,6 @@ func netpollclose(fd uintptr) int32 { ...@@ -27,6 +27,6 @@ func netpollclose(fd uintptr) int32 {
func netpollarm(pd *pollDesc, mode int) { func netpollarm(pd *pollDesc, mode int) {
} }
func netpoll(block bool) gList { func netpoll(delay int64) gList {
return gList{} return gList{}
} }
...@@ -57,15 +57,27 @@ func netpollarm(pd *pollDesc, mode int) { ...@@ -57,15 +57,27 @@ func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused") throw("runtime: unused")
} }
// Polls for ready network connections. // netpoll checks for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
func netpoll(block bool) gList { // delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if kq == -1 { if kq == -1 {
return gList{} return gList{}
} }
var tp *timespec var tp *timespec
var ts timespec var ts timespec
if !block { if delay < 0 {
tp = nil
} else if delay == 0 {
tp = &ts
} else {
ts.setNsec(delay)
if ts.tv_sec > 1e6 {
// Darwin returns EINVAL if the sleep time is too long.
ts.tv_sec = 1e6
}
tp = &ts tp = &ts
} }
var events [64]keventt var events [64]keventt
...@@ -76,6 +88,11 @@ retry: ...@@ -76,6 +88,11 @@ retry:
println("runtime: kevent on fd", kq, "failed with", -n) println("runtime: kevent on fd", kq, "failed with", -n)
throw("runtime: netpoll failed") throw("runtime: netpoll failed")
} }
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if delay > 0 {
return gList{}
}
goto retry goto retry
} }
var toRun gList var toRun gList
...@@ -110,8 +127,5 @@ retry: ...@@ -110,8 +127,5 @@ retry:
netpollready(&toRun, pd, mode) netpollready(&toRun, pd, mode)
} }
} }
if block && toRun.empty() {
goto retry
}
return toRun return toRun
} }
...@@ -178,27 +178,45 @@ func netpollarm(pd *pollDesc, mode int) { ...@@ -178,27 +178,45 @@ func netpollarm(pd *pollDesc, mode int) {
unlock(&pd.lock) unlock(&pd.lock)
} }
// polls for ready network connections // netpoll checks for ready network connections.
// returns list of goroutines that become runnable // Returns list of goroutines that become runnable.
func netpoll(block bool) gList { // delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
if portfd == -1 { if portfd == -1 {
return gList{} return gList{}
} }
var wait *timespec var wait *timespec
var zero timespec var ts timespec
if !block { if delay < 0 {
wait = &zero wait = nil
} else if delay == 0 {
wait = &ts
} else {
ts.setNsec(delay)
if ts.tv_sec > 1e6 {
// An arbitrary cap on how long to wait for a timer.
// 1e6 s == ~11.5 days.
ts.tv_sec = 1e6
}
wait = &ts
} }
var events [128]portevent var events [128]portevent
retry: retry:
var n uint32 = 1 var n uint32 = 1
if port_getn(portfd, &events[0], uint32(len(events)), &n, wait) < 0 { if port_getn(portfd, &events[0], uint32(len(events)), &n, wait) < 0 {
if e := errno(); e != _EINTR { if e := errno(); e != _EINTR && e != _ETIME {
print("runtime: port_getn on fd ", portfd, " failed (errno=", e, ")\n") print("runtime: port_getn on fd ", portfd, " failed (errno=", e, ")\n")
throw("runtime: netpoll failed") throw("runtime: netpoll failed")
} }
// If a timed sleep was interrupted, just return to
// recalculate how long we should sleep now.
if delay > 0 {
return gList{}
}
goto retry goto retry
} }
...@@ -242,8 +260,5 @@ retry: ...@@ -242,8 +260,5 @@ retry:
} }
} }
if block && toRun.empty() {
goto retry
}
return toRun return toRun
} }
...@@ -10,7 +10,7 @@ var netpollWaiters uint32 ...@@ -10,7 +10,7 @@ var netpollWaiters uint32
// Polls for ready network connections. // Polls for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
func netpoll(block bool) gList { func netpoll(delay int64) gList {
// Implementation for platforms that do not support // Implementation for platforms that do not support
// integrated network poller. // integrated network poller.
return gList{} return gList{}
......
...@@ -61,9 +61,12 @@ func netpollarm(pd *pollDesc, mode int) { ...@@ -61,9 +61,12 @@ func netpollarm(pd *pollDesc, mode int) {
throw("runtime: unused") throw("runtime: unused")
} }
// Polls for completed network IO. // netpoll checks for ready network connections.
// Returns list of goroutines that become runnable. // Returns list of goroutines that become runnable.
func netpoll(block bool) gList { // delay < 0: blocks indefinitely
// delay == 0: does not block, just polls
// delay > 0: block for up to that many nanoseconds
func netpoll(delay int64) gList {
var entries [64]overlappedEntry var entries [64]overlappedEntry
var wait, qty, key, flags, n, i uint32 var wait, qty, key, flags, n, i uint32
var errno int32 var errno int32
...@@ -75,23 +78,32 @@ func netpoll(block bool) gList { ...@@ -75,23 +78,32 @@ func netpoll(block bool) gList {
if iocphandle == _INVALID_HANDLE_VALUE { if iocphandle == _INVALID_HANDLE_VALUE {
return gList{} return gList{}
} }
wait = 0 if delay < 0 {
if block {
wait = _INFINITE wait = _INFINITE
} else if delay == 0 {
wait = 0
} else if delay < 1e6 {
wait = 1
} else if delay < 1e15 {
wait = uint32(delay / 1e6)
} else {
// An arbitrary cap on how long to wait for a timer.
// 1e9 ms == ~11.5 days.
wait = 1e9
} }
retry:
if _GetQueuedCompletionStatusEx != nil { if _GetQueuedCompletionStatusEx != nil {
n = uint32(len(entries) / int(gomaxprocs)) n = uint32(len(entries) / int(gomaxprocs))
if n < 8 { if n < 8 {
n = 8 n = 8
} }
if block { if delay != 0 {
mp.blocked = true mp.blocked = true
} }
if stdcall6(_GetQueuedCompletionStatusEx, iocphandle, uintptr(unsafe.Pointer(&entries[0])), uintptr(n), uintptr(unsafe.Pointer(&n)), uintptr(wait), 0) == 0 { if stdcall6(_GetQueuedCompletionStatusEx, iocphandle, uintptr(unsafe.Pointer(&entries[0])), uintptr(n), uintptr(unsafe.Pointer(&n)), uintptr(wait), 0) == 0 {
mp.blocked = false mp.blocked = false
errno = int32(getlasterror()) errno = int32(getlasterror())
if !block && errno == _WAIT_TIMEOUT { if errno == _WAIT_TIMEOUT {
return gList{} return gList{}
} }
println("runtime: GetQueuedCompletionStatusEx failed (errno=", errno, ")") println("runtime: GetQueuedCompletionStatusEx failed (errno=", errno, ")")
...@@ -111,13 +123,13 @@ retry: ...@@ -111,13 +123,13 @@ retry:
op = nil op = nil
errno = 0 errno = 0
qty = 0 qty = 0
if block { if delay != 0 {
mp.blocked = true mp.blocked = true
} }
if stdcall5(_GetQueuedCompletionStatus, iocphandle, uintptr(unsafe.Pointer(&qty)), uintptr(unsafe.Pointer(&key)), uintptr(unsafe.Pointer(&op)), uintptr(wait)) == 0 { if stdcall5(_GetQueuedCompletionStatus, iocphandle, uintptr(unsafe.Pointer(&qty)), uintptr(unsafe.Pointer(&key)), uintptr(unsafe.Pointer(&op)), uintptr(wait)) == 0 {
mp.blocked = false mp.blocked = false
errno = int32(getlasterror()) errno = int32(getlasterror())
if !block && errno == _WAIT_TIMEOUT { if errno == _WAIT_TIMEOUT {
return gList{} return gList{}
} }
if op == nil { if op == nil {
...@@ -129,9 +141,6 @@ retry: ...@@ -129,9 +141,6 @@ retry:
mp.blocked = false mp.blocked = false
handlecompletion(&toRun, op, errno, qty) handlecompletion(&toRun, op, errno, qty)
} }
if block && toRun.empty() {
goto retry
}
return toRun return toRun
} }
......
...@@ -1116,7 +1116,7 @@ func stopTheWorldWithSema() { ...@@ -1116,7 +1116,7 @@ func stopTheWorldWithSema() {
func startTheWorldWithSema(emitTraceEvent bool) int64 { func startTheWorldWithSema(emitTraceEvent bool) int64 {
mp := acquirem() // disable preemption because it can be holding p in a local var mp := acquirem() // disable preemption because it can be holding p in a local var
if netpollinited() { if netpollinited() {
list := netpoll(false) // non-blocking list := netpoll(0) // non-blocking
injectglist(&list) injectglist(&list)
} }
lock(&sched.lock) lock(&sched.lock)
...@@ -2252,7 +2252,7 @@ top: ...@@ -2252,7 +2252,7 @@ top:
// not set lastpoll yet), this thread will do blocking netpoll below // not set lastpoll yet), this thread will do blocking netpoll below
// anyway. // anyway.
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { // non-blocking if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop() gp := list.pop()
injectglist(&list) injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable) casgstatus(gp, _Gwaiting, _Grunnable)
...@@ -2406,14 +2406,16 @@ stop: ...@@ -2406,14 +2406,16 @@ stop:
if _g_.m.spinning { if _g_.m.spinning {
throw("findrunnable: netpoll with spinning") throw("findrunnable: netpoll with spinning")
} }
list := netpoll(true) // block until new work is available list := netpoll(-1) // block until new work is available
atomic.Store64(&sched.lastpoll, uint64(nanotime())) atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if !list.empty() { lock(&sched.lock)
lock(&sched.lock) _p_ = pidleget()
_p_ = pidleget() unlock(&sched.lock)
unlock(&sched.lock) if _p_ == nil {
if _p_ != nil { injectglist(&list)
acquirep(_p_) } else {
acquirep(_p_)
if !list.empty() {
gp := list.pop() gp := list.pop()
injectglist(&list) injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable) casgstatus(gp, _Gwaiting, _Grunnable)
...@@ -2422,7 +2424,11 @@ stop: ...@@ -2422,7 +2424,11 @@ stop:
} }
return gp, false return gp, false
} }
injectglist(&list) if wasSpinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
} }
} }
stopm() stopm()
...@@ -2442,7 +2448,7 @@ func pollWork() bool { ...@@ -2442,7 +2448,7 @@ func pollWork() bool {
return true return true
} }
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 { if netpollinited() && atomic.Load(&netpollWaiters) > 0 && sched.lastpoll != 0 {
if list := netpoll(false); !list.empty() { if list := netpoll(0); !list.empty() {
injectglist(&list) injectglist(&list)
return true return true
} }
...@@ -4371,7 +4377,7 @@ func sysmon() { ...@@ -4371,7 +4377,7 @@ func sysmon() {
now := nanotime() now := nanotime()
if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
list := netpoll(false) // non-blocking - returns list of goroutines list := netpoll(0) // non-blocking - returns list of goroutines
if !list.empty() { if !list.empty() {
// Need to decrement number of idle locked M's // Need to decrement number of idle locked M's
// (pretending that one more is running) before injectglist. // (pretending that one more is running) before injectglist.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment