Commit 06ac2627 authored by Ian Lance Taylor's avatar Ian Lance Taylor

runtime: initial scheduler changes for timers on P's

Add support to the main scheduler loop for handling timers on P's.
This is not used yet, as timers are not yet put on P's.

Updates #6239
Updates #27707

Change-Id: I6a359df408629f333a9232142ce19e8be8496dae
Reviewed-on: https://go-review.googlesource.com/c/go/+/171826
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarMichael Knyszek <mknyszek@google.com>
parent ff9f7bc9
...@@ -2221,6 +2221,9 @@ top: ...@@ -2221,6 +2221,9 @@ top:
if _p_.runSafePointFn != 0 { if _p_.runSafePointFn != 0 {
runSafePointFn() runSafePointFn()
} }
now, pollUntil, _ := checkTimers(_p_, 0)
if fingwait && fingwake { if fingwait && fingwake {
if gp := wakefing(); gp != nil { if gp := wakefing(); gp != nil {
ready(gp, 0, true) ready(gp, 0, true)
...@@ -2266,12 +2269,7 @@ top: ...@@ -2266,12 +2269,7 @@ top:
// Steal work from other P's. // Steal work from other P's.
procs := uint32(gomaxprocs) procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 { ranTimer := false
// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
// New work can appear from returning syscall/cgocall, network or timers.
// Neither of that submits to local run queues, so no point in stealing.
goto stop
}
// If number of spinning M's >= number of busy P's, block. // If number of spinning M's >= number of busy P's, block.
// This is necessary to prevent excessive CPU consumption // This is necessary to prevent excessive CPU consumption
// when GOMAXPROCS>>1 but the program parallelism is low. // when GOMAXPROCS>>1 but the program parallelism is low.
...@@ -2288,11 +2286,48 @@ top: ...@@ -2288,11 +2286,48 @@ top:
goto top goto top
} }
stealRunNextG := i > 2 // first look for ready queues with more than 1 g stealRunNextG := i > 2 // first look for ready queues with more than 1 g
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { p2 := allp[enum.position()]
if _p_ == p2 {
continue
}
if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
return gp, false return gp, false
} }
// Consider stealing timers from p2.
// This call to checkTimers is the only place where
// we hold a lock on a different P's timers.
// Lock contention can be a problem here, so avoid
// grabbing the lock if p2 is running and not marked
// for preemption. If p2 is running and not being
// preempted we assume it will handle its own timers.
if i > 2 && shouldStealTimers(p2) {
tnow, w, ran := checkTimers(p2, now)
now = tnow
if w != 0 && (pollUntil == 0 || w < pollUntil) {
pollUntil = w
}
if ran {
// Running the timers may have
// made an arbitrary number of G's
// ready and added them to this P's
// local run queue. That invalidates
// the assumption of runqsteal
// that is always has room to add
// stolen G's. So check now if there
// is a local G to run.
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
ranTimer = true
}
}
} }
} }
if ranTimer {
// Running a timer may have made some goroutine ready.
goto top
}
stop: stop:
...@@ -2309,6 +2344,12 @@ stop: ...@@ -2309,6 +2344,12 @@ stop:
return gp, false return gp, false
} }
delta := int64(-1)
if pollUntil != 0 {
// checkTimers ensures that polluntil > now.
delta = pollUntil - now
}
// wasm only: // wasm only:
// If a callback returned and no other goroutine is awake, // If a callback returned and no other goroutine is awake,
// then pause execution until a callback was triggered. // then pause execution until a callback was triggered.
...@@ -2400,14 +2441,16 @@ stop: ...@@ -2400,14 +2441,16 @@ stop:
} }
// poll network // poll network
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
atomic.Store64(&sched.pollUntil, uint64(pollUntil))
if _g_.m.p != 0 { if _g_.m.p != 0 {
throw("findrunnable: netpoll with p") throw("findrunnable: netpoll with p")
} }
if _g_.m.spinning { if _g_.m.spinning {
throw("findrunnable: netpoll with spinning") throw("findrunnable: netpoll with spinning")
} }
list := netpoll(-1) // block until new work is available list := netpoll(delta) // block until new work is available
atomic.Store64(&sched.pollUntil, 0)
atomic.Store64(&sched.lastpoll, uint64(nanotime())) atomic.Store64(&sched.lastpoll, uint64(nanotime()))
lock(&sched.lock) lock(&sched.lock)
_p_ = pidleget() _p_ = pidleget()
...@@ -2431,6 +2474,11 @@ stop: ...@@ -2431,6 +2474,11 @@ stop:
} }
goto top goto top
} }
} else if pollUntil != 0 && netpollinited() {
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
netpollBreak()
}
} }
stopm() stopm()
goto top goto top
...@@ -2457,6 +2505,22 @@ func pollWork() bool { ...@@ -2457,6 +2505,22 @@ func pollWork() bool {
return false return false
} }
// wakeNetPoller wakes up the thread sleeping in the network poller,
// if there is one, and if it isn't going to wake up anyhow before
// the when argument.
func wakeNetPoller(when int64) {
if atomic.Load64(&sched.lastpoll) == 0 {
// In findrunnable we ensure that when polling the pollUntil
// field is either zero or the time to which the current
// poll is expected to run. This can have a spurious wakeup
// but should never miss a wakeup.
pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
if pollerPollUntil == 0 || pollerPollUntil > when {
netpollBreak()
}
}
}
func resetspinning() { func resetspinning() {
_g_ := getg() _g_ := getg()
if !_g_.m.spinning { if !_g_.m.spinning {
...@@ -2525,10 +2589,20 @@ top: ...@@ -2525,10 +2589,20 @@ top:
gcstopm() gcstopm()
goto top goto top
} }
if _g_.m.p.ptr().runSafePointFn != 0 { pp := _g_.m.p.ptr()
if pp.runSafePointFn != 0 {
runSafePointFn() runSafePointFn()
} }
// Sanity check: if we are spinning, the run queue should be empty.
// Check this before calling checkTimers, as that might call
// goready to put a ready goroutine on the local run queue.
if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
throw("schedule: spinning with local work")
}
checkTimers(pp, 0)
var gp *g var gp *g
var inheritTime bool var inheritTime bool
...@@ -2560,9 +2634,8 @@ top: ...@@ -2560,9 +2634,8 @@ top:
} }
if gp == nil { if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr()) gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning { // We can see gp != nil here even if the M is spinning,
throw("schedule: spinning with local work") // if checkTimers added a local goroutine via goready.
}
} }
if gp == nil { if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available gp, inheritTime = findrunnable() // blocks until work is available
...@@ -2623,6 +2696,60 @@ func dropg() { ...@@ -2623,6 +2696,60 @@ func dropg() {
setGNoWB(&_g_.m.curg, nil) setGNoWB(&_g_.m.curg, nil)
} }
// checkTimers runs any timers for the P that are ready.
// If now is not 0 it is the current time.
// It returns the current time or 0 if it is not known,
// and the time when the next timer should run or 0 if there is no next timer,
// and reports whether it ran any timers.
// If the time when the next timer should run is not 0,
// it is always larger than the returned time.
// We pass now in and out to avoid extra calls of nanotime.
//go:yeswritebarrierrec
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) {
lock(&pp.timersLock)
adjusttimers(pp)
rnow = now
if len(pp.timers) > 0 {
if rnow == 0 {
rnow = nanotime()
}
for len(pp.timers) > 0 {
if tw := runtimer(pp, rnow); tw != 0 {
if tw > 0 {
pollUntil = tw
}
break
}
ran = true
}
}
unlock(&pp.timersLock)
return rnow, pollUntil, ran
}
// shouldStealTimers reports whether we should try stealing the timers from p2.
// We don't steal timers from a running P that is not marked for preemption,
// on the assumption that it will run its own timers. This reduces
// contention on the timers lock.
func shouldStealTimers(p2 *p) bool {
if p2.status != _Prunning {
return true
}
mp := p2.m.ptr()
if mp == nil || mp.locks > 0 {
return false
}
gp := mp.curg
if gp == nil || gp.atomicstatus != _Grunning || !gp.preempt {
return false
}
return true
}
func parkunlock_c(gp *g, lock unsafe.Pointer) bool { func parkunlock_c(gp *g, lock unsafe.Pointer) bool {
unlock((*mutex)(lock)) unlock((*mutex)(lock))
return true return true
...@@ -4305,6 +4432,13 @@ func checkdead() { ...@@ -4305,6 +4432,13 @@ func checkdead() {
return return
} }
// There are no goroutines running, so we can look at the P's.
for _, _p_ := range allp {
if len(_p_.timers) > 0 {
return
}
}
getg().m.throwing = -1 // do not dump full stacks getg().m.throwing = -1 // do not dump full stacks
throw("all goroutines are asleep - deadlock!") throw("all goroutines are asleep - deadlock!")
} }
...@@ -4392,6 +4526,12 @@ func sysmon() { ...@@ -4392,6 +4526,12 @@ func sysmon() {
incidlelocked(1) incidlelocked(1)
} }
} }
if timeSleepUntil() < now {
// There are timers that should have already run,
// perhaps because there is an unpreemptible P.
// Try to start an M to run them.
startm(nil, false)
}
// retake P's blocked in syscalls // retake P's blocked in syscalls
// and preempt long running G's // and preempt long running G's
if retake(now) != 0 { if retake(now) != 0 {
......
...@@ -598,13 +598,23 @@ type p struct { ...@@ -598,13 +598,23 @@ type p struct {
runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point runSafePointFn uint32 // if 1, run sched.safePointFn at next safe point
// Lock for timers. We normally access the timers while running
// on this P, but the scheduler can also do it from a different P.
timersLock mutex
// Actions to take at some time. This is used to implement the
// standard library's time package.
// Must hold timersLock to access.
timers []*timer
pad cpu.CacheLinePad pad cpu.CacheLinePad
} }
type schedt struct { type schedt struct {
// accessed atomically. keep at top to ensure alignment on 32-bit systems. // accessed atomically. keep at top to ensure alignment on 32-bit systems.
goidgen uint64 goidgen uint64
lastpoll uint64 lastpoll uint64 // time of last network poll, 0 if currently polling
pollUntil uint64 // time to which current poll is sleeping
lock mutex lock mutex
......
...@@ -325,6 +325,27 @@ func timerproc(tb *timersBucket) { ...@@ -325,6 +325,27 @@ func timerproc(tb *timersBucket) {
} }
} }
// adjusttimers looks through the timers in the current P's heap for
// any timers that have been modified to run earlier, and puts them in
// the correct place in the heap.
// The caller must have locked the timers for pp.
func adjusttimers(pp *p) {
if len(pp.timers) == 0 {
return
}
throw("adjusttimers: not yet implemented")
}
// runtimer examines the first timer in timers. If it is ready based on now,
// it runs the timer and removes or updates it.
// Returns 0 if it ran a timer, -1 if there are no more timers, or the time
// when the first timer should run.
// The caller must have locked the timers for pp.
func runtimer(pp *p, now int64) int64 {
throw("runtimer: not yet implemented")
return -1
}
func timejump() *g { func timejump() *g {
if faketime == 0 { if faketime == 0 {
return nil return nil
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment