Commit 2ae953bf authored by Roger Peppe's avatar Roger Peppe Committed by Rob Pike

time: allow cancelling of After events.

Also simplify sleeper algorithm and poll
occasionally so redundant sleeper goroutines
will quit sooner.

R=r, niemeyer, r2
CC=golang-dev
https://golang.org/cl/4063043
parent 66905bd9
...@@ -11,20 +11,40 @@ import ( ...@@ -11,20 +11,40 @@ import (
"container/heap" "container/heap"
) )
// The event type represents a single After or AfterFunc event. // The Timer type represents a single event.
type event struct { // When the Timer expires, the current time will be sent on C
// unless the Timer represents an AfterFunc event.
type Timer struct {
C <-chan int64
t int64 // The absolute time that the event should fire. t int64 // The absolute time that the event should fire.
f func(int64) // The function to call when the event fires. f func(int64) // The function to call when the event fires.
sleeping bool // A sleeper is sleeping for this event. i int // The event's index inside eventHeap.
} }
type eventHeap []*event type timerHeap []*Timer
var events eventHeap // forever is the absolute time (in ns) of an event that is forever away.
var eventMutex sync.Mutex const forever = 1 << 62
// maxSleepTime is the maximum length of time that a sleeper
// sleeps for before checking if it is defunct.
const maxSleepTime = 1e9
var (
// timerMutex guards the variables inside this var group.
timerMutex sync.Mutex
// timers holds a binary heap of pending events, terminated with a sentinel.
timers timerHeap
// currentSleeper is an ever-incrementing counter which represents
// the current sleeper. It allows older sleepers to detect that they are
// defunct and exit.
currentSleeper int64
)
func init() { func init() {
events.Push(&event{1 << 62, nil, true}) // sentinel timers.Push(&Timer{t: forever}) // sentinel
} }
// Sleep pauses the current goroutine for at least ns nanoseconds. // Sleep pauses the current goroutine for at least ns nanoseconds.
...@@ -51,101 +71,133 @@ func sleep(t, ns int64) (int64, os.Error) { ...@@ -51,101 +71,133 @@ func sleep(t, ns int64) (int64, os.Error) {
return t, nil return t, nil
} }
// NewTimer creates a new Timer that will send
// the current time on its channel after at least ns nanoseconds.
func NewTimer(ns int64) *Timer {
c := make(chan int64, 1)
e := after(ns, func(t int64) { c <- t })
e.C = c
return e
}
// After waits at least ns nanoseconds before sending the current time // After waits at least ns nanoseconds before sending the current time
// on the returned channel. // on the returned channel.
// It is equivalent to NewTimer(ns).C.
func After(ns int64) <-chan int64 { func After(ns int64) <-chan int64 {
c := make(chan int64, 1) return NewTimer(ns).C
after(ns, func(t int64) { c <- t })
return c
} }
// AfterFunc waits at least ns nanoseconds before calling f // AfterFunc waits at least ns nanoseconds before calling f
// in its own goroutine. // in its own goroutine. It returns a Timer that can
func AfterFunc(ns int64, f func()) { // be used to cancel the call using its Stop method.
after(ns, func(_ int64) { func AfterFunc(ns int64, f func()) *Timer {
return after(ns, func(_ int64) {
go f() go f()
}) })
} }
// Stop prevents the Timer from firing.
// It returns true if the call stops the timer, false if the timer has already
// expired or stopped.
func (e *Timer) Stop() (ok bool) {
timerMutex.Lock()
// Avoid removing the first event in the queue so that
// we don't start a new sleeper unnecessarily.
if e.i > 0 {
heap.Remove(timers, e.i)
}
ok = e.f != nil
e.f = nil
timerMutex.Unlock()
return
}
// after is the implementation of After and AfterFunc. // after is the implementation of After and AfterFunc.
// When the current time is after ns, it calls f with the current time. // When the current time is after ns, it calls f with the current time.
// It assumes that f will not block. // It assumes that f will not block.
func after(ns int64, f func(int64)) { func after(ns int64, f func(int64)) (e *Timer) {
now := Nanoseconds()
t := Nanoseconds() + ns t := Nanoseconds() + ns
eventMutex.Lock() if ns > 0 && t < now {
t0 := events[0].t panic("time: time overflow")
heap.Push(events, &event{t, f, false})
if t < t0 {
go sleeper()
} }
eventMutex.Unlock() timerMutex.Lock()
t0 := timers[0].t
e = &Timer{nil, t, f, -1}
heap.Push(timers, e)
// Start a new sleeper if the new event is before
// the first event in the queue. If the length of time
// until the new event is at least maxSleepTime,
// then we're guaranteed that the sleeper will wake up
// in time to service it, so no new sleeper is needed.
if t0 > t && (t0 == forever || ns < maxSleepTime) {
currentSleeper++
go sleeper(currentSleeper)
}
timerMutex.Unlock()
return
} }
// sleeper continually looks at the earliest event in the queue, marks it // sleeper continually looks at the earliest event in the queue, waits until it happens,
// as sleeping, waits until it happens, then removes any events // then removes any events in the queue that are due. It stops when the queue
// in the queue that are due. It stops when it finds an event that is // is empty or when another sleeper has been started.
// already marked as sleeping. When an event is inserted before the first item, func sleeper(sleeperId int64) {
// a new sleeper is started. timerMutex.Lock()
// e := timers[0]
// Scheduling vagaries mean that sleepers may not wake up in
// exactly the order of the events that they are waiting for,
// but this does not matter as long as there are at least as
// many sleepers as events marked sleeping (invariant). This ensures that
// there is always a sleeper to service the remaining events.
//
// A sleeper will remove at least the event it has been waiting for
// unless the event has already been removed by another sleeper. Both
// cases preserve the invariant described above.
func sleeper() {
eventMutex.Lock()
e := events[0]
for !e.sleeping {
t := Nanoseconds() t := Nanoseconds()
for e.t != forever {
if dt := e.t - t; dt > 0 { if dt := e.t - t; dt > 0 {
e.sleeping = true if dt > maxSleepTime {
eventMutex.Unlock() dt = maxSleepTime
if nt, err := sleep(t, dt); err != nil { }
// If sleep has encountered an error, timerMutex.Unlock()
// there's not much we can do. We pretend syscall.Sleep(dt)
// that time really has advanced by the required timerMutex.Lock()
// amount and lie to the rest of the system. if currentSleeper != sleeperId {
t = e.t // Another sleeper has been started, making this one redundant.
} else { break
t = nt
} }
eventMutex.Lock()
e = events[0]
} }
e = timers[0]
t = Nanoseconds()
for t >= e.t { for t >= e.t {
if e.f != nil {
e.f(t) e.f(t)
heap.Pop(events) e.f = nil
e = events[0] }
heap.Pop(timers)
e = timers[0]
} }
} }
eventMutex.Unlock() timerMutex.Unlock()
} }
func (eventHeap) Len() int { func (timerHeap) Len() int {
return len(events) return len(timers)
} }
func (eventHeap) Less(i, j int) bool { func (timerHeap) Less(i, j int) bool {
return events[i].t < events[j].t return timers[i].t < timers[j].t
} }
func (eventHeap) Swap(i, j int) { func (timerHeap) Swap(i, j int) {
events[i], events[j] = events[j], events[i] timers[i], timers[j] = timers[j], timers[i]
timers[i].i = i
timers[j].i = j
} }
func (eventHeap) Push(x interface{}) { func (timerHeap) Push(x interface{}) {
events = append(events, x.(*event)) e := x.(*Timer)
e.i = len(timers)
timers = append(timers, e)
} }
func (eventHeap) Pop() interface{} { func (timerHeap) Pop() interface{} {
// TODO: possibly shrink array. // TODO: possibly shrink array.
n := len(events) - 1 n := len(timers) - 1
e := events[n] e := timers[n]
events[n] = nil timers[n] = nil
events = events[0:n] timers = timers[0:n]
e.i = -1
return e return e
} }
...@@ -64,6 +64,18 @@ func BenchmarkAfterFunc(b *testing.B) { ...@@ -64,6 +64,18 @@ func BenchmarkAfterFunc(b *testing.B) {
<-c <-c
} }
func BenchmarkAfter(b *testing.B) {
for i := 0; i < b.N; i++ {
<-After(1)
}
}
func BenchmarkStop(b *testing.B) {
for i := 0; i < b.N; i++ {
NewTimer(1e9).Stop()
}
}
func TestAfter(t *testing.T) { func TestAfter(t *testing.T) {
const delay = int64(100e6) const delay = int64(100e6)
start := Nanoseconds() start := Nanoseconds()
...@@ -94,6 +106,30 @@ func TestAfterTick(t *testing.T) { ...@@ -94,6 +106,30 @@ func TestAfterTick(t *testing.T) {
} }
} }
func TestAfterStop(t *testing.T) {
const msec = 1e6
AfterFunc(100*msec, func() {})
t0 := NewTimer(50 * msec)
c1 := make(chan bool, 1)
t1 := AfterFunc(150*msec, func() { c1 <- true })
c2 := After(200 * msec)
if !t0.Stop() {
t.Fatalf("failed to stop event 0")
}
if !t1.Stop() {
t.Fatalf("failed to stop event 1")
}
<-c2
_, ok0 := <-t0.C
_, ok1 := <-c1
if ok0 || ok1 {
t.Fatalf("events were not stopped")
}
if t1.Stop() {
t.Fatalf("Stop returned true twice")
}
}
var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0} var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0}
type afterResult struct { type afterResult struct {
......
...@@ -43,3 +43,14 @@ func TestTeardown(t *testing.T) { ...@@ -43,3 +43,14 @@ func TestTeardown(t *testing.T) {
ticker.Stop() ticker.Stop()
} }
} }
func BenchmarkTicker(b *testing.B) {
ticker := NewTicker(1)
b.ResetTimer()
b.StartTimer()
for i := 0; i < b.N; i++ {
<-ticker.C
}
b.StopTimer()
ticker.Stop()
}
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment