diff --git a/src/pkg/time/sleep.go b/src/pkg/time/sleep.go index 3538775adf..833552d684 100644 --- a/src/pkg/time/sleep.go +++ b/src/pkg/time/sleep.go @@ -11,20 +11,40 @@ import ( "container/heap" ) -// The event type represents a single After or AfterFunc event. -type event struct { - t int64 // The absolute time that the event should fire. - f func(int64) // The function to call when the event fires. - sleeping bool // A sleeper is sleeping for this event. +// The Timer type represents a single event. +// When the Timer expires, the current time will be sent on C +// unless the Timer represents an AfterFunc event. +type Timer struct { + C <-chan int64 + t int64 // The absolute time that the event should fire. + f func(int64) // The function to call when the event fires. + i int // The event's index inside eventHeap. } -type eventHeap []*event +type timerHeap []*Timer -var events eventHeap -var eventMutex sync.Mutex +// forever is the absolute time (in ns) of an event that is forever away. +const forever = 1 << 62 + +// maxSleepTime is the maximum length of time that a sleeper +// sleeps for before checking if it is defunct. +const maxSleepTime = 1e9 + +var ( + // timerMutex guards the variables inside this var group. + timerMutex sync.Mutex + + // timers holds a binary heap of pending events, terminated with a sentinel. + timers timerHeap + + // currentSleeper is an ever-incrementing counter which represents + // the current sleeper. It allows older sleepers to detect that they are + // defunct and exit. + currentSleeper int64 +) func init() { - events.Push(&event{1 << 62, nil, true}) // sentinel + timers.Push(&Timer{t: forever}) // sentinel } // Sleep pauses the current goroutine for at least ns nanoseconds. @@ -51,101 +71,133 @@ func sleep(t, ns int64) (int64, os.Error) { return t, nil } +// NewTimer creates a new Timer that will send +// the current time on its channel after at least ns nanoseconds. +func NewTimer(ns int64) *Timer { + c := make(chan int64, 1) + e := after(ns, func(t int64) { c <- t }) + e.C = c + return e +} + // After waits at least ns nanoseconds before sending the current time // on the returned channel. +// It is equivalent to NewTimer(ns).C. func After(ns int64) <-chan int64 { - c := make(chan int64, 1) - after(ns, func(t int64) { c <- t }) - return c + return NewTimer(ns).C } // AfterFunc waits at least ns nanoseconds before calling f -// in its own goroutine. -func AfterFunc(ns int64, f func()) { - after(ns, func(_ int64) { +// in its own goroutine. It returns a Timer that can +// be used to cancel the call using its Stop method. +func AfterFunc(ns int64, f func()) *Timer { + return after(ns, func(_ int64) { go f() }) } +// Stop prevents the Timer from firing. +// It returns true if the call stops the timer, false if the timer has already +// expired or stopped. +func (e *Timer) Stop() (ok bool) { + timerMutex.Lock() + // Avoid removing the first event in the queue so that + // we don't start a new sleeper unnecessarily. + if e.i > 0 { + heap.Remove(timers, e.i) + } + ok = e.f != nil + e.f = nil + timerMutex.Unlock() + return +} + // after is the implementation of After and AfterFunc. // When the current time is after ns, it calls f with the current time. // It assumes that f will not block. -func after(ns int64, f func(int64)) { +func after(ns int64, f func(int64)) (e *Timer) { + now := Nanoseconds() t := Nanoseconds() + ns - eventMutex.Lock() - t0 := events[0].t - heap.Push(events, &event{t, f, false}) - if t < t0 { - go sleeper() + if ns > 0 && t < now { + panic("time: time overflow") } - eventMutex.Unlock() + timerMutex.Lock() + t0 := timers[0].t + e = &Timer{nil, t, f, -1} + heap.Push(timers, e) + // Start a new sleeper if the new event is before + // the first event in the queue. If the length of time + // until the new event is at least maxSleepTime, + // then we're guaranteed that the sleeper will wake up + // in time to service it, so no new sleeper is needed. + if t0 > t && (t0 == forever || ns < maxSleepTime) { + currentSleeper++ + go sleeper(currentSleeper) + } + timerMutex.Unlock() + return } -// sleeper continually looks at the earliest event in the queue, marks it -// as sleeping, waits until it happens, then removes any events -// in the queue that are due. It stops when it finds an event that is -// already marked as sleeping. When an event is inserted before the first item, -// a new sleeper is started. -// -// Scheduling vagaries mean that sleepers may not wake up in -// exactly the order of the events that they are waiting for, -// but this does not matter as long as there are at least as -// many sleepers as events marked sleeping (invariant). This ensures that -// there is always a sleeper to service the remaining events. -// -// A sleeper will remove at least the event it has been waiting for -// unless the event has already been removed by another sleeper. Both -// cases preserve the invariant described above. -func sleeper() { - eventMutex.Lock() - e := events[0] - for !e.sleeping { - t := Nanoseconds() +// sleeper continually looks at the earliest event in the queue, waits until it happens, +// then removes any events in the queue that are due. It stops when the queue +// is empty or when another sleeper has been started. +func sleeper(sleeperId int64) { + timerMutex.Lock() + e := timers[0] + t := Nanoseconds() + for e.t != forever { if dt := e.t - t; dt > 0 { - e.sleeping = true - eventMutex.Unlock() - if nt, err := sleep(t, dt); err != nil { - // If sleep has encountered an error, - // there's not much we can do. We pretend - // that time really has advanced by the required - // amount and lie to the rest of the system. - t = e.t - } else { - t = nt + if dt > maxSleepTime { + dt = maxSleepTime + } + timerMutex.Unlock() + syscall.Sleep(dt) + timerMutex.Lock() + if currentSleeper != sleeperId { + // Another sleeper has been started, making this one redundant. + break } - eventMutex.Lock() - e = events[0] } + e = timers[0] + t = Nanoseconds() for t >= e.t { - e.f(t) - heap.Pop(events) - e = events[0] + if e.f != nil { + e.f(t) + e.f = nil + } + heap.Pop(timers) + e = timers[0] } } - eventMutex.Unlock() + timerMutex.Unlock() } -func (eventHeap) Len() int { - return len(events) +func (timerHeap) Len() int { + return len(timers) } -func (eventHeap) Less(i, j int) bool { - return events[i].t < events[j].t +func (timerHeap) Less(i, j int) bool { + return timers[i].t < timers[j].t } -func (eventHeap) Swap(i, j int) { - events[i], events[j] = events[j], events[i] +func (timerHeap) Swap(i, j int) { + timers[i], timers[j] = timers[j], timers[i] + timers[i].i = i + timers[j].i = j } -func (eventHeap) Push(x interface{}) { - events = append(events, x.(*event)) +func (timerHeap) Push(x interface{}) { + e := x.(*Timer) + e.i = len(timers) + timers = append(timers, e) } -func (eventHeap) Pop() interface{} { +func (timerHeap) Pop() interface{} { // TODO: possibly shrink array. - n := len(events) - 1 - e := events[n] - events[n] = nil - events = events[0:n] + n := len(timers) - 1 + e := timers[n] + timers[n] = nil + timers = timers[0:n] + e.i = -1 return e } diff --git a/src/pkg/time/sleep_test.go b/src/pkg/time/sleep_test.go index 9e36288f88..4007db561a 100644 --- a/src/pkg/time/sleep_test.go +++ b/src/pkg/time/sleep_test.go @@ -64,6 +64,18 @@ func BenchmarkAfterFunc(b *testing.B) { <-c } +func BenchmarkAfter(b *testing.B) { + for i := 0; i < b.N; i++ { + <-After(1) + } +} + +func BenchmarkStop(b *testing.B) { + for i := 0; i < b.N; i++ { + NewTimer(1e9).Stop() + } +} + func TestAfter(t *testing.T) { const delay = int64(100e6) start := Nanoseconds() @@ -94,6 +106,30 @@ func TestAfterTick(t *testing.T) { } } +func TestAfterStop(t *testing.T) { + const msec = 1e6 + AfterFunc(100*msec, func() {}) + t0 := NewTimer(50 * msec) + c1 := make(chan bool, 1) + t1 := AfterFunc(150*msec, func() { c1 <- true }) + c2 := After(200 * msec) + if !t0.Stop() { + t.Fatalf("failed to stop event 0") + } + if !t1.Stop() { + t.Fatalf("failed to stop event 1") + } + <-c2 + _, ok0 := <-t0.C + _, ok1 := <-c1 + if ok0 || ok1 { + t.Fatalf("events were not stopped") + } + if t1.Stop() { + t.Fatalf("Stop returned true twice") + } +} + var slots = []int{5, 3, 6, 6, 6, 1, 1, 2, 7, 9, 4, 8, 0} type afterResult struct { diff --git a/src/pkg/time/tick_test.go b/src/pkg/time/tick_test.go index 2a63a0f2b3..f2304a14e3 100644 --- a/src/pkg/time/tick_test.go +++ b/src/pkg/time/tick_test.go @@ -43,3 +43,14 @@ func TestTeardown(t *testing.T) { ticker.Stop() } } + +func BenchmarkTicker(b *testing.B) { + ticker := NewTicker(1) + b.ResetTimer() + b.StartTimer() + for i := 0; i < b.N; i++ { + <-ticker.C + } + b.StopTimer() + ticker.Stop() +}