Commit 48eb79ec authored by Ian Lance Taylor's avatar Ian Lance Taylor

runtime: add new modtimer function

This adds a new field to P, adjustTimers, that tells the P that one of
its existing timers was modified to be earlier, and that it therefore
needs to resort them.

Updates #27707

Change-Id: I4c5f5b51ed116f1d898d3f87cdddfa1b552337f8
Reviewed-on: https://go-review.googlesource.com/c/go/+/171832
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: default avatarMichael Knyszek <mknyszek@google.com>
parent a813d3c7
...@@ -4122,6 +4122,7 @@ func (pp *p) destroy() { ...@@ -4122,6 +4122,7 @@ func (pp *p) destroy() {
// The world is stopped so we don't need to hold timersLock. // The world is stopped so we don't need to hold timersLock.
moveTimers(plocal, pp.timers) moveTimers(plocal, pp.timers)
pp.timers = nil pp.timers = nil
pp.adjustTimers = 0
} }
// If there's a background worker, make it runnable and put // If there's a background worker, make it runnable and put
// it on the global queue so it can clean itself up. // it on the global queue so it can clean itself up.
......
...@@ -607,6 +607,12 @@ type p struct { ...@@ -607,6 +607,12 @@ type p struct {
// Must hold timersLock to access. // Must hold timersLock to access.
timers []*timer timers []*timer
// Number of timerModifiedEarlier timers on P's heap.
// This should only be modified while holding timersLock,
// or while the timer status is in a transient state
// such as timerModifying.
adjustTimers uint32
pad cpu.CacheLinePad pad cpu.CacheLinePad
} }
......
...@@ -131,6 +131,16 @@ type timersBucket struct { ...@@ -131,6 +131,16 @@ type timersBucket struct {
// timerRunning -> wait until status changes // timerRunning -> wait until status changes
// timerMoving -> wait until status changes // timerMoving -> wait until status changes
// timerModifying -> panic: concurrent deltimer/modtimer calls // timerModifying -> panic: concurrent deltimer/modtimer calls
// modtimer:
// timerWaiting -> timerModifying -> timerModifiedXX
// timerModifiedXX -> timerModifying -> timerModifiedYY
// timerNoStatus -> timerWaiting
// timerRemoved -> timerWaiting
// timerRunning -> wait until status changes
// timerMoving -> wait until status changes
// timerRemoving -> wait until status changes
// timerDeleted -> panic: concurrent modtimer/deltimer calls
// timerModifying -> panic: concurrent modtimer calls
// Values for the timer status field. // Values for the timer status field.
const ( const (
...@@ -270,6 +280,11 @@ func addtimer(t *timer) { ...@@ -270,6 +280,11 @@ func addtimer(t *timer) {
} }
t.status = timerWaiting t.status = timerWaiting
addInitializedTimer(t)
}
// addInitializedTimer adds an initialized timer to the current P.
func addInitializedTimer(t *timer) {
when := t.when when := t.when
pp := getg().m.p.ptr() pp := getg().m.p.ptr()
...@@ -363,7 +378,9 @@ func deltimer(t *timer) bool { ...@@ -363,7 +378,9 @@ func deltimer(t *timer) bool {
return true return true
} }
case timerModifiedEarlier: case timerModifiedEarlier:
tpp := t.pp.ptr()
if atomic.Cas(&t.status, s, timerModifying) { if atomic.Cas(&t.status, s, timerModifying) {
atomic.Xadd(&tpp.adjustTimers, -1)
if !atomic.Cas(&t.status, timerModifying, timerDeleted) { if !atomic.Cas(&t.status, timerModifying, timerDeleted) {
badTimer() badTimer()
} }
...@@ -438,12 +455,94 @@ func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) { ...@@ -438,12 +455,94 @@ func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) {
return true, ok return true, ok
} }
// modtimer modifies an existing timer.
// This is called by the netpoll code.
func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) { func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {
if oldTimers { if oldTimers {
modtimerOld(t, when, period, f, arg, seq) modtimerOld(t, when, period, f, arg, seq)
return return
} }
throw("new modtimer not yet implemented")
if when < 0 {
when = maxWhen
}
status := uint32(timerNoStatus)
wasRemoved := false
loop:
for {
switch status = atomic.Load(&t.status); status {
case timerWaiting, timerModifiedEarlier, timerModifiedLater:
if atomic.Cas(&t.status, status, timerModifying) {
break loop
}
case timerNoStatus, timerRemoved:
// Timer was already run and t is no longer in a heap.
// Act like addtimer.
wasRemoved = true
atomic.Store(&t.status, timerWaiting)
break loop
case timerRunning, timerRemoving, timerMoving:
// The timer is being run or moved, by a different P.
// Wait for it to complete.
osyield()
case timerDeleted:
// Simultaneous calls to modtimer and deltimer.
badTimer()
case timerModifying:
// Multiple simultaneous calls to modtimer.
badTimer()
default:
badTimer()
}
}
t.period = period
t.f = f
t.arg = arg
t.seq = seq
if wasRemoved {
t.when = when
addInitializedTimer(t)
} else {
// The timer is in some other P's heap, so we can't change
// the when field. If we did, the other P's heap would
// be out of order. So we put the new when value in the
// nextwhen field, and let the other P set the when field
// when it is prepared to resort the heap.
t.nextwhen = when
newStatus := uint32(timerModifiedLater)
if when < t.when {
newStatus = timerModifiedEarlier
}
// Update the adjustTimers field. Subtract one if we
// are removing a timerModifiedEarlier, add one if we
// are adding a timerModifiedEarlier.
tpp := t.pp.ptr()
adjust := int32(0)
if status == timerModifiedEarlier {
adjust--
}
if newStatus == timerModifiedEarlier {
adjust++
}
if adjust != 0 {
atomic.Xadd(&tpp.adjustTimers, adjust)
}
// Set the new status of the timer.
if !atomic.Cas(&t.status, timerModifying, newStatus) {
badTimer()
}
// If the new status is earlier, wake up the poller.
if newStatus == timerModifiedEarlier {
wakeNetPoller(when)
}
}
} }
func modtimerOld(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) { func modtimerOld(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment