Commit e870f06c authored by Austin Clements's avatar Austin Clements

runtime: yield time slice to most recently readied G

Currently, when the runtime ready()s a G, it adds it to the end of the
current P's run queue and continues running. If there are many other
things in the run queue, this can result in a significant delay before
the ready()d G actually runs and can hurt fairness when other Gs in
the run queue are CPU hogs. For example, if there are three Gs sharing
a P, one of which is a CPU hog that never voluntarily gives up the P
and the other two of which are doing small amounts of work and
communicating back and forth on an unbuffered channel, the two
communicating Gs will get very little CPU time.

Change this so that when G1 ready()s G2 and then blocks, the scheduler
immediately hands off the remainder of G1's time slice to G2. In the
above example, the two communicating Gs will now act as a unit and
together get half of the CPU time, while the CPU hog gets the other
half of the CPU time.

This fixes the problem demonstrated by the ping-pong benchmark added
in the previous commit:

benchmark                old ns/op     new ns/op     delta
BenchmarkPingPongHog     684287        825           -99.88%

On the x/benchmarks suite, this change improves the performance of
garbage by ~6% (for GOMAXPROCS=1 and 4), and json by 28% and 36% for
GOMAXPROCS=1 and 4. It has negligible effect on heap size.

This has no effect on the go1 benchmark suite since those benchmarks
are mostly single-threaded.

Change-Id: I858a08eaa78f702ea98a5fac99d28a4ac91d339f
Reviewed-on: https://go-review.googlesource.com/9289Reviewed-by: default avatarRick Hudson <rlh@golang.org>
Reviewed-by: default avatarRuss Cox <rsc@golang.org>
parent da0e37fa
...@@ -145,7 +145,7 @@ func ready(gp *g, traceskip int) { ...@@ -145,7 +145,7 @@ func ready(gp *g, traceskip int) {
// status is Gwaiting or Gscanwaiting, make Grunnable and put on runq // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
casgstatus(gp, _Gwaiting, _Grunnable) casgstatus(gp, _Gwaiting, _Grunnable)
runqput(_g_.m.p.ptr(), gp) runqput(_g_.m.p.ptr(), gp, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 { // TODO: fast atomic if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 { // TODO: fast atomic
wakep() wakep()
} }
...@@ -185,12 +185,12 @@ func readyExecute(gp *g, traceskip int) { ...@@ -185,12 +185,12 @@ func readyExecute(gp *g, traceskip int) {
// Preempt the current g // Preempt the current g
casgstatus(_g_, _Grunning, _Grunnable) casgstatus(_g_, _Grunning, _Grunnable)
runqput(_g_.m.p.ptr(), _g_) runqput(_g_.m.p.ptr(), _g_, false)
dropg() dropg()
// Ready gp and switch to it // Ready gp and switch to it
casgstatus(gp, _Gwaiting, _Grunnable) casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp) execute(gp, false)
}) })
} }
...@@ -1233,15 +1233,19 @@ func gcstopm() { ...@@ -1233,15 +1233,19 @@ func gcstopm() {
} }
// Schedules gp to run on the current M. // Schedules gp to run on the current M.
// If inheritTime is true, gp inherits the remaining time in the
// current time slice. Otherwise, it starts a new time slice.
// Never returns. // Never returns.
func execute(gp *g) { func execute(gp *g, inheritTime bool) {
_g_ := getg() _g_ := getg()
casgstatus(gp, _Grunnable, _Grunning) casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0 gp.waitsince = 0
gp.preempt = false gp.preempt = false
gp.stackguard0 = gp.stack.lo + _StackGuard gp.stackguard0 = gp.stack.lo + _StackGuard
_g_.m.p.ptr().schedtick++ if !inheritTime {
_g_.m.p.ptr().schedtick++
}
_g_.m.curg = gp _g_.m.curg = gp
gp.m = _g_.m gp.m = _g_.m
...@@ -1260,7 +1264,7 @@ func execute(gp *g) { ...@@ -1260,7 +1264,7 @@ func execute(gp *g) {
// Finds a runnable goroutine to execute. // Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network. // Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() *g { func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg() _g_ := getg()
top: top:
...@@ -1275,8 +1279,8 @@ top: ...@@ -1275,8 +1279,8 @@ top:
} }
// local runq // local runq
if gp := runqget(_g_.m.p.ptr()); gp != nil { if gp, inheritTime := runqget(_g_.m.p.ptr()); gp != nil {
return gp return gp, inheritTime
} }
// global runq // global runq
...@@ -1285,7 +1289,7 @@ top: ...@@ -1285,7 +1289,7 @@ top:
gp := globrunqget(_g_.m.p.ptr(), 0) gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock) unlock(&sched.lock)
if gp != nil { if gp != nil {
return gp return gp, false
} }
} }
...@@ -1303,7 +1307,7 @@ top: ...@@ -1303,7 +1307,7 @@ top:
if trace.enabled { if trace.enabled {
traceGoUnpark(gp, 0) traceGoUnpark(gp, 0)
} }
return gp return gp, false
} }
} }
...@@ -1325,12 +1329,12 @@ top: ...@@ -1325,12 +1329,12 @@ top:
_p_ := allp[fastrand1()%uint32(gomaxprocs)] _p_ := allp[fastrand1()%uint32(gomaxprocs)]
var gp *g var gp *g
if _p_ == _g_.m.p.ptr() { if _p_ == _g_.m.p.ptr() {
gp = runqget(_p_) gp, _ = runqget(_p_)
} else { } else {
gp = runqsteal(_g_.m.p.ptr(), _p_) gp = runqsteal(_g_.m.p.ptr(), _p_)
} }
if gp != nil { if gp != nil {
return gp return gp, false
} }
} }
stop: stop:
...@@ -1344,7 +1348,7 @@ stop: ...@@ -1344,7 +1348,7 @@ stop:
if trace.enabled { if trace.enabled {
traceGoUnpark(gp, 0) traceGoUnpark(gp, 0)
} }
return gp return gp, false
} }
// return P and block // return P and block
...@@ -1356,7 +1360,7 @@ stop: ...@@ -1356,7 +1360,7 @@ stop:
if sched.runqsize != 0 { if sched.runqsize != 0 {
gp := globrunqget(_g_.m.p.ptr(), 0) gp := globrunqget(_g_.m.p.ptr(), 0)
unlock(&sched.lock) unlock(&sched.lock)
return gp return gp, false
} }
_p_ := releasep() _p_ := releasep()
pidleput(_p_) pidleput(_p_)
...@@ -1402,7 +1406,7 @@ stop: ...@@ -1402,7 +1406,7 @@ stop:
if trace.enabled { if trace.enabled {
traceGoUnpark(gp, 0) traceGoUnpark(gp, 0)
} }
return gp return gp, false
} }
injectglist(gp) injectglist(gp)
} }
...@@ -1468,7 +1472,7 @@ func schedule() { ...@@ -1468,7 +1472,7 @@ func schedule() {
if _g_.m.lockedg != nil { if _g_.m.lockedg != nil {
stoplockedm() stoplockedm()
execute(_g_.m.lockedg) // Never returns. execute(_g_.m.lockedg, false) // Never returns.
} }
top: top:
...@@ -1478,6 +1482,7 @@ top: ...@@ -1478,6 +1482,7 @@ top:
} }
var gp *g var gp *g
var inheritTime bool
if trace.enabled || trace.shutdown { if trace.enabled || trace.shutdown {
gp = traceReader() gp = traceReader()
if gp != nil { if gp != nil {
...@@ -1506,13 +1511,13 @@ top: ...@@ -1506,13 +1511,13 @@ top:
} }
} }
if gp == nil { if gp == nil {
gp = runqget(_g_.m.p.ptr()) gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning { if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work") throw("schedule: spinning with local work")
} }
} }
if gp == nil { if gp == nil {
gp = findrunnable() // blocks until work is available gp, inheritTime = findrunnable() // blocks until work is available
resetspinning() resetspinning()
} }
...@@ -1523,7 +1528,7 @@ top: ...@@ -1523,7 +1528,7 @@ top:
goto top goto top
} }
execute(gp) execute(gp, inheritTime)
} }
// dropg removes the association between m and the current goroutine m->curg (gp for short). // dropg removes the association between m and the current goroutine m->curg (gp for short).
...@@ -1568,7 +1573,7 @@ func park_m(gp *g) { ...@@ -1568,7 +1573,7 @@ func park_m(gp *g) {
traceGoUnpark(gp, 2) traceGoUnpark(gp, 2)
} }
casgstatus(gp, _Gwaiting, _Grunnable) casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp) // Schedule it back, never returns. execute(gp, true) // Schedule it back, never returns.
} }
} }
schedule() schedule()
...@@ -2007,12 +2012,12 @@ func exitsyscall0(gp *g) { ...@@ -2007,12 +2012,12 @@ func exitsyscall0(gp *g) {
unlock(&sched.lock) unlock(&sched.lock)
if _p_ != nil { if _p_ != nil {
acquirep(_p_) acquirep(_p_)
execute(gp) // Never returns. execute(gp, false) // Never returns.
} }
if _g_.m.lockedg != nil { if _g_.m.lockedg != nil {
// Wait until another thread schedules gp and so m again. // Wait until another thread schedules gp and so m again.
stoplockedm() stoplockedm()
execute(gp) // Never returns. execute(gp, false) // Never returns.
} }
stopm() stopm()
schedule() // Never returns. schedule() // Never returns.
...@@ -2168,7 +2173,7 @@ func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr ...@@ -2168,7 +2173,7 @@ func newproc1(fn *funcval, argp *uint8, narg int32, nret int32, callerpc uintptr
if trace.enabled { if trace.enabled {
traceGoCreate(newg, newg.startpc) traceGoCreate(newg, newg.startpc)
} }
runqput(_p_, newg) runqput(_p_, newg, true)
if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 && unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) { // TODO: fast atomic if atomicload(&sched.npidle) != 0 && atomicload(&sched.nmspinning) == 0 && unsafe.Pointer(fn.fn) != unsafe.Pointer(funcPC(main)) { // TODO: fast atomic
wakep() wakep()
...@@ -2596,12 +2601,11 @@ func procresize(nprocs int32) *p { ...@@ -2596,12 +2601,11 @@ func procresize(nprocs int32) *p {
p.runqtail-- p.runqtail--
gp := p.runq[p.runqtail%uint32(len(p.runq))] gp := p.runq[p.runqtail%uint32(len(p.runq))]
// push onto head of global queue // push onto head of global queue
gp.schedlink = sched.runqhead globrunqputhead(gp)
sched.runqhead.set(gp) }
if sched.runqtail == 0 { if p.runnext != 0 {
sched.runqtail.set(gp) globrunqputhead(p.runnext.ptr())
} p.runnext = 0
sched.runqsize++
} }
// if there's a background worker, make it runnable and put // if there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up // it on the global queue so it can clean itself up
...@@ -3150,6 +3154,19 @@ func globrunqput(gp *g) { ...@@ -3150,6 +3154,19 @@ func globrunqput(gp *g) {
sched.runqsize++ sched.runqsize++
} }
// Put gp at the head of the global runnable queue.
// Sched must be locked.
// May run during STW, so write barriers are not allowed.
//go:nowritebarrier
func globrunqputhead(gp *g) {
gp.schedlink = sched.runqhead
sched.runqhead.set(gp)
if sched.runqtail == 0 {
sched.runqtail.set(gp)
}
sched.runqsize++
}
// Put a batch of runnable goroutines on the global runnable queue. // Put a batch of runnable goroutines on the global runnable queue.
// Sched must be locked. // Sched must be locked.
func globrunqputbatch(ghead *g, gtail *g, n int32) { func globrunqputbatch(ghead *g, gtail *g, n int32) {
...@@ -3192,7 +3209,7 @@ func globrunqget(_p_ *p, max int32) *g { ...@@ -3192,7 +3209,7 @@ func globrunqget(_p_ *p, max int32) *g {
for ; n > 0; n-- { for ; n > 0; n-- {
gp1 := sched.runqhead.ptr() gp1 := sched.runqhead.ptr()
sched.runqhead = gp1.schedlink sched.runqhead = gp1.schedlink
runqput(_p_, gp1) runqput(_p_, gp1, false)
} }
return gp return gp
} }
...@@ -3223,13 +3240,28 @@ func pidleget() *p { ...@@ -3223,13 +3240,28 @@ func pidleget() *p {
// runqempty returns true if _p_ has no Gs on its local run queue. // runqempty returns true if _p_ has no Gs on its local run queue.
// Note that this test is generally racy. // Note that this test is generally racy.
func runqempty(_p_ *p) bool { func runqempty(_p_ *p) bool {
return _p_.runqhead == _p_.runqtail return _p_.runqhead == _p_.runqtail && _p_.runnext == 0
} }
// Try to put g on local runnable queue. // runqput tries to put g on the local runnable queue.
// If it's full, put onto global queue. // If next if false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P. // Executed only by the owner P.
func runqput(_p_ *p, gp *g) { func runqput(_p_ *p, gp *g, next bool) {
if next {
retryNext:
oldnext := _p_.runnext
if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
goto retryNext
}
if oldnext == 0 {
return
}
// Kick the old runnext out to the regular run queue.
gp = oldnext.ptr()
}
retry: retry:
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with consumers h := atomicload(&_p_.runqhead) // load-acquire, synchronize with consumers
t := _p_.runqtail t := _p_.runqtail
...@@ -3277,17 +3309,30 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool { ...@@ -3277,17 +3309,30 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
} }
// Get g from local runnable queue. // Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P. // Executed only by the owner P.
func runqget(_p_ *p) *g { func runqget(_p_ *p) (gp *g, inheritTime bool) {
// If there's a runnext, it's the next G to run.
for {
next := _p_.runnext
if next == 0 {
break
}
if _p_.runnext.cas(next, 0) {
return next.ptr(), true
}
}
for { for {
h := atomicload(&_p_.runqhead) // load-acquire, synchronize with other consumers h := atomicload(&_p_.runqhead) // load-acquire, synchronize with other consumers
t := _p_.runqtail t := _p_.runqtail
if t == h { if t == h {
return nil return nil, false
} }
gp := _p_.runq[h%uint32(len(_p_.runq))] gp := _p_.runq[h%uint32(len(_p_.runq))]
if cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume if cas(&_p_.runqhead, h, h+1) { // cas-release, commits consume
return gp return gp, false
} }
} }
} }
...@@ -3302,6 +3347,14 @@ func runqgrab(_p_ *p, batch []*g) uint32 { ...@@ -3302,6 +3347,14 @@ func runqgrab(_p_ *p, batch []*g) uint32 {
n := t - h n := t - h
n = n - n/2 n = n - n/2
if n == 0 { if n == 0 {
// Try to steal from _p_.runnext.
if next := _p_.runnext; next != 0 {
if !_p_.runnext.cas(next, 0) {
continue
}
batch[0] = next.ptr()
return 1
}
return 0 return 0
} }
if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
...@@ -3347,19 +3400,19 @@ func testSchedLocalQueue() { ...@@ -3347,19 +3400,19 @@ func testSchedLocalQueue() {
_p_ := new(p) _p_ := new(p)
gs := make([]g, len(_p_.runq)) gs := make([]g, len(_p_.runq))
for i := 0; i < len(_p_.runq); i++ { for i := 0; i < len(_p_.runq); i++ {
if runqget(_p_) != nil { if g, _ := runqget(_p_); g != nil {
throw("runq is not empty initially") throw("runq is not empty initially")
} }
for j := 0; j < i; j++ { for j := 0; j < i; j++ {
runqput(_p_, &gs[i]) runqput(_p_, &gs[i], false)
} }
for j := 0; j < i; j++ { for j := 0; j < i; j++ {
if runqget(_p_) != &gs[i] { if g, _ := runqget(_p_); g != &gs[i] {
print("bad element at iter ", i, "/", j, "\n") print("bad element at iter ", i, "/", j, "\n")
throw("bad element") throw("bad element")
} }
} }
if runqget(_p_) != nil { if g, _ := runqget(_p_); g != nil {
throw("runq is not empty afterwards") throw("runq is not empty afterwards")
} }
} }
...@@ -3372,7 +3425,7 @@ func testSchedLocalQueueSteal() { ...@@ -3372,7 +3425,7 @@ func testSchedLocalQueueSteal() {
for i := 0; i < len(p1.runq); i++ { for i := 0; i < len(p1.runq); i++ {
for j := 0; j < i; j++ { for j := 0; j < i; j++ {
gs[j].sig = 0 gs[j].sig = 0
runqput(p1, &gs[j]) runqput(p1, &gs[j], false)
} }
gp := runqsteal(p2, p1) gp := runqsteal(p2, p1)
s := 0 s := 0
...@@ -3381,7 +3434,7 @@ func testSchedLocalQueueSteal() { ...@@ -3381,7 +3434,7 @@ func testSchedLocalQueueSteal() {
gp.sig++ gp.sig++
} }
for { for {
gp = runqget(p2) gp, _ = runqget(p2)
if gp == nil { if gp == nil {
break break
} }
...@@ -3389,7 +3442,7 @@ func testSchedLocalQueueSteal() { ...@@ -3389,7 +3442,7 @@ func testSchedLocalQueueSteal() {
gp.sig++ gp.sig++
} }
for { for {
gp = runqget(p1) gp, _ = runqget(p1)
if gp == nil { if gp == nil {
break break
} }
......
...@@ -292,6 +292,57 @@ func main() { ...@@ -292,6 +292,57 @@ func main() {
} }
` `
func TestPingPongHog(t *testing.T) {
if testing.Short() {
t.Skip("skipping in -short mode")
}
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
done := make(chan bool)
hogChan, lightChan := make(chan bool), make(chan bool)
hogCount, lightCount := 0, 0
run := func(limit int, counter *int, wake chan bool) {
for {
select {
case <-done:
return
case <-wake:
for i := 0; i < limit; i++ {
*counter++
}
wake <- true
}
}
}
// Start two co-scheduled hog goroutines.
for i := 0; i < 2; i++ {
go run(1e6, &hogCount, hogChan)
}
// Start two co-scheduled light goroutines.
for i := 0; i < 2; i++ {
go run(1e3, &lightCount, lightChan)
}
// Start goroutine pairs and wait for a few preemption rounds.
hogChan <- true
lightChan <- true
time.Sleep(100 * time.Millisecond)
close(done)
<-hogChan
<-lightChan
// Check that hogCount and lightCount are within a factor of
// 2, which indicates that both pairs of goroutines handed off
// the P within a time-slice to their buddy.
if hogCount > lightCount*2 || lightCount > hogCount*2 {
t.Fatalf("want hogCount/lightCount in [0.5, 2]; got %d/%d = %g", hogCount, lightCount, float64(hogCount)/float64(lightCount))
}
}
func BenchmarkPingPongHog(b *testing.B) { func BenchmarkPingPongHog(b *testing.B) {
defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1)) defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(1))
......
...@@ -129,6 +129,9 @@ type guintptr uintptr ...@@ -129,6 +129,9 @@ type guintptr uintptr
func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) }
func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) }
func (gp *guintptr) cas(old, new guintptr) bool {
return casuintptr((*uintptr)(unsafe.Pointer(gp)), uintptr(old), uintptr(new))
}
type puintptr uintptr type puintptr uintptr
...@@ -350,10 +353,20 @@ type p struct { ...@@ -350,10 +353,20 @@ type p struct {
goidcache uint64 goidcache uint64
goidcacheend uint64 goidcacheend uint64
// Queue of runnable goroutines. // Queue of runnable goroutines. Accessed without lock.
runqhead uint32 runqhead uint32
runqtail uint32 runqtail uint32
runq [256]*g runq [256]*g
// runnext, if non-nil, is a runnable G that was ready'd by
// the current G and should be run next instead of what's in
// runq if there's time remaining in the running G's time
// slice. It will inherit the time left in the current time
// slice. If a set of goroutines is locked in a
// communicate-and-wait pattern, this schedules that set as a
// unit and eliminates the (potentially large) scheduling
// latency that otherwise arises from adding the ready'd
// goroutines to the end of the run queue.
runnext guintptr
// Available G's (status == Gdead) // Available G's (status == Gdead)
gfree *g gfree *g
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment