Commit 3578918b authored by Austin Clements's avatar Austin Clements

runtime: replace manually managed G dequeues with a type

There are two manually managed G dequeues. Abstract these both into a
shared gQueue type. This also introduces a gList type, which we'll use
to replace several manually-managed G lists in follow-up CLs.

This makes the code more readable and maintainable. gcFlushBgCredit in
particular becomes much easier to follow. It also makes it easier to
introduce more G queues in the future. Finally, the gList type clearly
distinguishes between lists of Gs and individual Gs; currently both
are represented by a *g, which can easily lead to confusion and bugs.

Change-Id: Ic7798841b405d311fc8b6aa5a958ffa4c7993c6c
Reviewed-on: https://go-review.googlesource.com/129396
Run-TryBot: Austin Clements <austin@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarRick Hudson <rlh@golang.org>
parent 723479bc
...@@ -1023,8 +1023,8 @@ var work struct { ...@@ -1023,8 +1023,8 @@ var work struct {
// there was neither enough credit to steal or enough work to // there was neither enough credit to steal or enough work to
// do. // do.
assistQueue struct { assistQueue struct {
lock mutex lock mutex
head, tail guintptr q gQueue
} }
// sweepWaiters is a list of blocked goroutines to wake when // sweepWaiters is a list of blocked goroutines to wake when
......
...@@ -602,9 +602,7 @@ func gcAssistAlloc1(gp *g, scanWork int64) { ...@@ -602,9 +602,7 @@ func gcAssistAlloc1(gp *g, scanWork int64) {
// new assists from going to sleep after this point. // new assists from going to sleep after this point.
func gcWakeAllAssists() { func gcWakeAllAssists() {
lock(&work.assistQueue.lock) lock(&work.assistQueue.lock)
injectglist(work.assistQueue.head.ptr()) injectglist(work.assistQueue.q.popList().head.ptr())
work.assistQueue.head.set(nil)
work.assistQueue.tail.set(nil)
unlock(&work.assistQueue.lock) unlock(&work.assistQueue.lock)
} }
...@@ -625,24 +623,17 @@ func gcParkAssist() bool { ...@@ -625,24 +623,17 @@ func gcParkAssist() bool {
} }
gp := getg() gp := getg()
oldHead, oldTail := work.assistQueue.head, work.assistQueue.tail oldList := work.assistQueue.q
if oldHead == 0 { work.assistQueue.q.pushBack(gp)
work.assistQueue.head.set(gp)
} else {
oldTail.ptr().schedlink.set(gp)
}
work.assistQueue.tail.set(gp)
gp.schedlink.set(nil)
// Recheck for background credit now that this G is in // Recheck for background credit now that this G is in
// the queue, but can still back out. This avoids a // the queue, but can still back out. This avoids a
// race in case background marking has flushed more // race in case background marking has flushed more
// credit since we checked above. // credit since we checked above.
if atomic.Loadint64(&gcController.bgScanCredit) > 0 { if atomic.Loadint64(&gcController.bgScanCredit) > 0 {
work.assistQueue.head = oldHead work.assistQueue.q = oldList
work.assistQueue.tail = oldTail if oldList.tail != 0 {
if oldTail != 0 { oldList.tail.ptr().schedlink.set(nil)
oldTail.ptr().schedlink.set(nil)
} }
unlock(&work.assistQueue.lock) unlock(&work.assistQueue.lock)
return false return false
...@@ -663,7 +654,7 @@ func gcParkAssist() bool { ...@@ -663,7 +654,7 @@ func gcParkAssist() bool {
// //
//go:nowritebarrierrec //go:nowritebarrierrec
func gcFlushBgCredit(scanWork int64) { func gcFlushBgCredit(scanWork int64) {
if work.assistQueue.head == 0 { if work.assistQueue.q.empty() {
// Fast path; there are no blocked assists. There's a // Fast path; there are no blocked assists. There's a
// small window here where an assist may add itself to // small window here where an assist may add itself to
// the blocked queue and park. If that happens, we'll // the blocked queue and park. If that happens, we'll
...@@ -675,23 +666,21 @@ func gcFlushBgCredit(scanWork int64) { ...@@ -675,23 +666,21 @@ func gcFlushBgCredit(scanWork int64) {
scanBytes := int64(float64(scanWork) * gcController.assistBytesPerWork) scanBytes := int64(float64(scanWork) * gcController.assistBytesPerWork)
lock(&work.assistQueue.lock) lock(&work.assistQueue.lock)
gp := work.assistQueue.head.ptr() for !work.assistQueue.q.empty() && scanBytes > 0 {
for gp != nil && scanBytes > 0 { gp := work.assistQueue.q.pop()
// Note that gp.gcAssistBytes is negative because gp // Note that gp.gcAssistBytes is negative because gp
// is in debt. Think carefully about the signs below. // is in debt. Think carefully about the signs below.
if scanBytes+gp.gcAssistBytes >= 0 { if scanBytes+gp.gcAssistBytes >= 0 {
// Satisfy this entire assist debt. // Satisfy this entire assist debt.
scanBytes += gp.gcAssistBytes scanBytes += gp.gcAssistBytes
gp.gcAssistBytes = 0 gp.gcAssistBytes = 0
xgp := gp // It's important that we *not* put gp in
gp = gp.schedlink.ptr()
// It's important that we *not* put xgp in
// runnext. Otherwise, it's possible for user // runnext. Otherwise, it's possible for user
// code to exploit the GC worker's high // code to exploit the GC worker's high
// scheduler priority to get itself always run // scheduler priority to get itself always run
// before other goroutines and always in the // before other goroutines and always in the
// fresh quantum started by GC. // fresh quantum started by GC.
ready(xgp, 0, false) ready(gp, 0, false)
} else { } else {
// Partially satisfy this assist. // Partially satisfy this assist.
gp.gcAssistBytes += scanBytes gp.gcAssistBytes += scanBytes
...@@ -700,23 +689,10 @@ func gcFlushBgCredit(scanWork int64) { ...@@ -700,23 +689,10 @@ func gcFlushBgCredit(scanWork int64) {
// back of the queue so that large assists // back of the queue so that large assists
// can't clog up the assist queue and // can't clog up the assist queue and
// substantially delay small assists. // substantially delay small assists.
xgp := gp work.assistQueue.q.pushBack(gp)
gp = gp.schedlink.ptr()
if gp == nil {
// gp is the only assist in the queue.
gp = xgp
} else {
xgp.schedlink = 0
work.assistQueue.tail.ptr().schedlink.set(xgp)
work.assistQueue.tail.set(xgp)
}
break break
} }
} }
work.assistQueue.head.set(gp)
if gp == nil {
work.assistQueue.tail.set(nil)
}
if scanBytes > 0 { if scanBytes > 0 {
// Convert from scan bytes back to work. // Convert from scan bytes back to work.
......
...@@ -4667,13 +4667,7 @@ func mget() *m { ...@@ -4667,13 +4667,7 @@ func mget() *m {
// May run during STW, so write barriers are not allowed. // May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec //go:nowritebarrierrec
func globrunqput(gp *g) { func globrunqput(gp *g) {
gp.schedlink = 0 sched.runq.pushBack(gp)
if sched.runqtail != 0 {
sched.runqtail.ptr().schedlink.set(gp)
} else {
sched.runqhead.set(gp)
}
sched.runqtail.set(gp)
sched.runqsize++ sched.runqsize++
} }
...@@ -4682,25 +4676,17 @@ func globrunqput(gp *g) { ...@@ -4682,25 +4676,17 @@ func globrunqput(gp *g) {
// May run during STW, so write barriers are not allowed. // May run during STW, so write barriers are not allowed.
//go:nowritebarrierrec //go:nowritebarrierrec
func globrunqputhead(gp *g) { func globrunqputhead(gp *g) {
gp.schedlink = sched.runqhead sched.runq.push(gp)
sched.runqhead.set(gp)
if sched.runqtail == 0 {
sched.runqtail.set(gp)
}
sched.runqsize++ sched.runqsize++
} }
// Put a batch of runnable goroutines on the global runnable queue. // Put a batch of runnable goroutines on the global runnable queue.
// This clears *batch.
// Sched must be locked. // Sched must be locked.
func globrunqputbatch(ghead *g, gtail *g, n int32) { func globrunqputbatch(batch *gQueue, n int32) {
gtail.schedlink = 0 sched.runq.pushBackAll(*batch)
if sched.runqtail != 0 {
sched.runqtail.ptr().schedlink.set(ghead)
} else {
sched.runqhead.set(ghead)
}
sched.runqtail.set(gtail)
sched.runqsize += n sched.runqsize += n
*batch = gQueue{}
} }
// Try get a batch of G's from the global runnable queue. // Try get a batch of G's from the global runnable queue.
...@@ -4722,16 +4708,11 @@ func globrunqget(_p_ *p, max int32) *g { ...@@ -4722,16 +4708,11 @@ func globrunqget(_p_ *p, max int32) *g {
} }
sched.runqsize -= n sched.runqsize -= n
if sched.runqsize == 0 {
sched.runqtail = 0
}
gp := sched.runqhead.ptr() gp := sched.runq.pop()
sched.runqhead = gp.schedlink
n-- n--
for ; n > 0; n-- { for ; n > 0; n-- {
gp1 := sched.runqhead.ptr() gp1 := sched.runq.pop()
sched.runqhead = gp1.schedlink
runqput(_p_, gp1, false) runqput(_p_, gp1, false)
} }
return gp return gp
...@@ -4859,10 +4840,13 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool { ...@@ -4859,10 +4840,13 @@ func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
for i := uint32(0); i < n; i++ { for i := uint32(0); i < n; i++ {
batch[i].schedlink.set(batch[i+1]) batch[i].schedlink.set(batch[i+1])
} }
var q gQueue
q.head.set(batch[0])
q.tail.set(batch[n])
// Now put the batch on global queue. // Now put the batch on global queue.
lock(&sched.lock) lock(&sched.lock)
globrunqputbatch(batch[0], batch[n], int32(n+1)) globrunqputbatch(&q, int32(n+1))
unlock(&sched.lock) unlock(&sched.lock)
return true return true
} }
...@@ -4974,6 +4958,107 @@ func runqsteal(_p_, p2 *p, stealRunNextG bool) *g { ...@@ -4974,6 +4958,107 @@ func runqsteal(_p_, p2 *p, stealRunNextG bool) *g {
return gp return gp
} }
// A gQueue is a dequeue of Gs linked through g.schedlink. A G can only
// be on one gQueue or gList at a time.
type gQueue struct {
head guintptr
tail guintptr
}
// empty returns true if q is empty.
func (q *gQueue) empty() bool {
return q.head == 0
}
// push adds gp to the head of q.
func (q *gQueue) push(gp *g) {
gp.schedlink = q.head
q.head.set(gp)
if q.tail == 0 {
q.tail.set(gp)
}
}
// pushBack adds gp to the tail of q.
func (q *gQueue) pushBack(gp *g) {
gp.schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink.set(gp)
} else {
q.head.set(gp)
}
q.tail.set(gp)
}
// pushBackAll adds all Gs in l2 to the tail of q. After this q2 must
// not be used.
func (q *gQueue) pushBackAll(q2 gQueue) {
if q2.tail == 0 {
return
}
q2.tail.ptr().schedlink = 0
if q.tail != 0 {
q.tail.ptr().schedlink = q2.head
} else {
q.head = q2.head
}
q.tail = q2.tail
}
// pop removes and returns the head of queue q. It returns nil if
// q is empty.
func (q *gQueue) pop() *g {
gp := q.head.ptr()
if gp != nil {
q.head = gp.schedlink
if q.head == 0 {
q.tail = 0
}
}
return gp
}
// popList takes all Gs in q and returns them as a gList.
func (q *gQueue) popList() gList {
stack := gList{q.head}
*q = gQueue{}
return stack
}
// A gList is a list of Gs linked through g.schedlink. A G can only be
// on one gQueue or gList at a time.
type gList struct {
head guintptr
}
// empty returns true if l is empty.
func (l *gList) empty() bool {
return l.head == 0
}
// push adds gp to the head of l.
func (l *gList) push(gp *g) {
gp.schedlink = l.head
l.head.set(gp)
}
// pushAll prepends all Gs in q to l.
func (l *gList) pushAll(q gQueue) {
if !q.empty() {
q.tail.ptr().schedlink = l.head
l.head = q.head
}
}
// pop removes and returns the head of l. If l is empty, it returns nil.
func (l *gList) pop() *g {
gp := l.head.ptr()
if gp != nil {
l.head = gp.schedlink
}
return gp
}
//go:linkname setMaxThreads runtime/debug.setMaxThreads //go:linkname setMaxThreads runtime/debug.setMaxThreads
func setMaxThreads(in int) (out int) { func setMaxThreads(in int) (out int) {
lock(&sched.lock) lock(&sched.lock)
......
...@@ -574,8 +574,7 @@ type schedt struct { ...@@ -574,8 +574,7 @@ type schedt struct {
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// Global runnable queue. // Global runnable queue.
runqhead guintptr runq gQueue
runqtail guintptr
runqsize int32 runqsize int32
// Global cache of dead G's. // Global cache of dead G's.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment