diff --git a/timers/timers.go b/timers/timers.go new file mode 100644 index 00000000..b5724765 --- /dev/null +++ b/timers/timers.go @@ -0,0 +1,244 @@ +// Package timers provides a timer facility similar to the standard library's. +// It's designed to be faster, but less accurate. +package timers + +import ( + "sync" + "time" + "unsafe" + + "github.com/andres-erbsen/clock" + "github.com/uber-go/atomic" +) + +const cacheline = 64 + +const ( + // State machine for timers. + stateScheduled = iota + stateExpired + stateCanceled +) + +// A Timer is a handle to a deferred operation. +type Timer struct { + f func() + deadline uint64 // in ticks + state atomic.Int32 + next *Timer + tail *Timer // only set on head +} + +// Stop cancels the deferred operation. It returns whether or not the +// cancellation succeeded. +func (t *Timer) Stop() bool { + if t == nil { + return true // canceling a nil timer always succeeds + } + return t.state.CAS(stateScheduled, stateCanceled) +} + +func (t *Timer) expire() bool { + return t.state.CAS(stateScheduled, stateExpired) +} + +func (t *Timer) pushOne(head *Timer) *Timer { + head.next, head.tail = nil, nil + return t.push(head) +} + +func (t *Timer) push(head *Timer) *Timer { + // link lists + if head == nil { + return t + } + if head.tail == nil { + head.next = t + } else { + head.tail.next = t + } + + // set tail + if t == nil { + return head + } + if t.tail == nil { + head.tail = t + } else { + head.tail = t.tail + t.tail = nil + } + return head +} + +type bucket struct { + sync.Mutex + timers *Timer + // Avoid false sharing: + // http://mechanical-sympathy.blogspot.com/2011/07/false-sharing.html + _ [cacheline - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(&Timer{})]byte +} + +type bucketList struct { + mask uint64 + buckets []bucket +} + +func (bs *bucketList) Schedule(deadline uint64, f func()) *Timer { + if f == nil { + return nil + } + t := &Timer{ + f: f, + deadline: deadline, + } + b := &bs.buckets[deadline&bs.mask] + b.Lock() + b.timers = b.timers.push(t) + b.Unlock() + return t +} + +func (bs *bucketList) GatherExpired(start, end, now uint64) *Timer { + // If we're more than a full rotation behind, just inspect each bucket + // once. + if int(end-start+1) > len(bs.buckets) { + start, end = 0, uint64(len(bs.buckets)) + } + + var todo *Timer + for tick := start; tick < end; tick++ { + todo = bs.gatherBucket(tick, now, todo) + } + return todo +} + +func (bs *bucketList) gatherBucket(tick, now uint64, todo *Timer) *Timer { + b := &bs.buckets[tick&bs.mask] + + b.Lock() + batch := b.timers + b.timers = nil + b.Unlock() + + var unexpired *Timer + for batch != nil { + next := batch.next + if batch.deadline > now { + unexpired = unexpired.pushOne(batch) + } else if batch.expire() { + todo = todo.pushOne(batch) + } + batch = next + } + if unexpired != nil { + // We should only hit this case if we're a full wheel rotation + // behind. + b.Lock() + b.timers = b.timers.push(unexpired) + b.Unlock() + } + return todo +} + +func (bs *bucketList) Clear() *Timer { + var todo *Timer + for i := range bs.buckets { + b := &bs.buckets[i] + b.Lock() + todo = todo.push(b.timers) + b.Unlock() + } + return todo +} + +// A Wheel schedules and executes deferred operations. +type Wheel struct { + clock clock.Clock + periodExp uint64 + ticker *clock.Ticker + buckets bucketList + stopOnce sync.Once + stop chan struct{} + stopped chan struct{} +} + +// NewWheel creates and starts a new Wheel. +func NewWheel(period, maxTimeout time.Duration) *Wheel { + return newWheel(period, maxTimeout, clock.New()) +} + +func newWheel(period, maxTimeout time.Duration, clock clock.Clock) *Wheel { + tickNanos, power := nextPowerOfTwo(int64(period)) + numBuckets, _ := nextPowerOfTwo(int64(maxTimeout) / tickNanos) + if time.Duration(numBuckets*tickNanos) < maxTimeout { + numBuckets = numBuckets << 1 + } + w := &Wheel{ + clock: clock, + periodExp: power, + ticker: clock.Ticker(time.Duration(tickNanos)), + buckets: bucketList{ + mask: uint64(numBuckets - 1), + buckets: make([]bucket, numBuckets), + }, + stop: make(chan struct{}), + stopped: make(chan struct{}), + } + go w.tick(clock.Now()) + return w +} + +// Stop shuts down the wheel, blocking until all the background goroutines +// complete. +func (w *Wheel) Stop() { + // TChannel Channels are safe to close more than once, so wheels should be + // too. + w.stopOnce.Do(func() { + w.ticker.Stop() + close(w.stop) + <-w.stopped + todo := w.buckets.Clear() + w.fire(todo) + }) +} + +// AfterFunc creates a Timer in the wheel. +func (w *Wheel) AfterFunc(d time.Duration, f func()) *Timer { + deadline := w.asTick(w.clock.Now().Add(d)) + return w.buckets.Schedule(deadline, f) +} + +func (w *Wheel) tick(now time.Time) { + watermark := w.asTick(now) + for { + select { + case now := <-w.ticker.C: + nowTick := w.asTick(now) + todo := w.buckets.GatherExpired(watermark, nowTick, nowTick) + w.fire(todo) + watermark = nowTick + case <-w.stop: + close(w.stopped) + return + } + } +} + +func (w *Wheel) fire(batch *Timer) { + for t := batch; t != nil; t = t.next { + t.f() + } +} + +func (w *Wheel) asTick(t time.Time) uint64 { + return uint64(t.UnixNano() >> w.periodExp) +} + +func nextPowerOfTwo(n int64) (num int64, exponent uint64) { + pow := uint64(0) + for (1 << pow) < n { + pow++ + } + return 1 << pow, pow +} diff --git a/timers/timers_test.go b/timers/timers_test.go new file mode 100644 index 00000000..deb85c6a --- /dev/null +++ b/timers/timers_test.go @@ -0,0 +1,408 @@ +package timers + +import ( + "math/rand" + "sync" + "testing" + "time" + "unsafe" + + "github.com/andres-erbsen/clock" + "github.com/stretchr/testify/assert" +) + +// O(n), only for use in tests. +func (t *Timer) len() int { + n := 0 + for el := t; el != nil; el = el.next { + n++ + } + return n +} + +func fakeWheel(tick time.Duration) (*Wheel, *clock.Mock) { + clock := clock.NewMock() + return newWheel(tick, 2*time.Minute, clock), clock +} + +func newBucketList() *bucketList { + return &bucketList{ + mask: uint64(1<<3 - 1), + buckets: make([]bucket, 1<<3), + } +} + +func assertQueuedTimers(t testing.TB, bs *bucketList, bucketIdx, expected int) { + b := &bs.buckets[bucketIdx] + b.Lock() + defer b.Unlock() + assert.Equal( + t, + expected, + b.timers.len(), + "Unexpected number of counters in bucket %v.", bucketIdx, + ) +} + +func fakeWork() {} // sentinel non-nil func to schedule + +func randomTimeouts(n int, max time.Duration) []time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ds := make([]time.Duration, n) + for i := 0; i < n; i++ { + ds[i] = time.Duration(r.Int63n(int64(max))) + } + return ds +} + +func TestTimersLinkedListNilHandling(t *testing.T) { + // nil receiver & head + var root *Timer + assert.Equal(t, 0, root.len(), "Unexpected length for nil timer.") + assert.Equal(t, 0, root.push(nil).len(), "Unexpected after pushing nil on nil timer.") + + // nil receiver + head := &Timer{} + root = root.push(head) + assert.Equal(t, 1, root.len(), "Unexpected length after pushing timer to nil root.") + assert.Equal(t, head, root) + assert.Nil(t, root.next) + assert.Nil(t, root.tail) + + // nil head + originalRoot := &Timer{} + root = originalRoot + root = root.push(nil) + assert.Equal(t, 1, root.len(), "Unexpected length after pushing nil to a len-1 root.") + assert.Equal(t, originalRoot, root) + assert.Nil(t, root.next) + assert.Nil(t, root.tail) +} + +func TestTimersLinkedListPush(t *testing.T) { + + els := []*Timer{ + {deadline: 0}, + {deadline: 1}, + {deadline: 2}, + {deadline: 3}, + {deadline: 4}, + {deadline: 5}, + } + // Push to a nil list. + var root *Timer + root = root.push(els[0]) + assert.Equal(t, root, els[0]) + assert.Equal(t, 1, root.len()) + + // Add a second element. + root = root.push(els[1]) + assert.Equal(t, root, els[1]) + assert.Equal(t, 2, root.len()) + assert.Equal(t, els[0], root.next) + assert.Equal(t, els[0], root.tail) + + // Push a list. + root = root.push((*Timer)(nil). + push(els[2]). + push(els[3]). + push(els[4]). + push(els[5])) + assert.Equal(t, root, els[5]) + assert.Equal(t, 6, root.len()) + assert.Equal(t, els[4], root.next) + assert.Equal(t, els[0], root.tail) + + var deadlines []uint64 + for el := root; el != nil; el = el.next { + deadlines = append(deadlines, el.deadline) + } + assert.Equal(t, []uint64{5, 4, 3, 2, 1, 0}, deadlines, "Unexpected list") +} + +func TestBucketsAvoidFalseSharing(t *testing.T) { + assert.Equal(t, + int(cacheline), + int(unsafe.Sizeof(bucket{})), + "Expected buckets to exactly fill a CPU cache line.", + ) +} + +func TestBucketListSchedule(t *testing.T) { + bs := newBucketList() + assertQueuedTimers(t, bs, 0, 0) + + bs.Schedule(uint64(0), fakeWork) + assertQueuedTimers(t, bs, 0, 1) + + bs.Schedule(uint64(1), fakeWork) + assertQueuedTimers(t, bs, 1, 1) + + bs.Schedule(uint64(7), fakeWork) + assertQueuedTimers(t, bs, 7, 1) + + bs.Schedule(uint64(8), fakeWork) + assertQueuedTimers(t, bs, 0, 2) +} + +func TestBucketListGatherExpired(t *testing.T) { + bs := newBucketList() + for i := range bs.buckets { + bs.Schedule(uint64(i), fakeWork) + } + assert.Equal(t, 0, bs.GatherExpired(0, 0, 0).len(), "Unexpected no. expired timers in range [0,0).") + assert.Equal(t, 3, bs.GatherExpired(0, 3, 3).len(), "Unexpected no. expired timers in range [0,3).") + assert.Equal(t, 0, bs.GatherExpired(0, 3, 3).len(), "Unexpected no. expired timers in range [0,3) on re-gather.") + assert.Equal(t, 5, bs.GatherExpired(3, 100, 100).len(), "Unexpected no. expired timers in range [3,100).") + + // We should be leaving unexpired timers in place. + bs.Schedule(8, fakeWork) + bs.Schedule(8, fakeWork) + assert.Equal(t, 0, bs.GatherExpired(0, 1, 1).len(), "Unexpected no. expired timers in range [0,1).") + assertQueuedTimers(t, bs, 0, 2) +} + +func TestBucketListLockAndClear(t *testing.T) { + bs := newBucketList() + for i := range bs.buckets { + bs.Schedule(uint64(i), fakeWork) + } + bs.Schedule(uint64(1<<20), fakeWork) + all := bs.LockAndClear() + assert.Equal(t, 9, all.len(), "Unexpected number of timers after clearing bucketList.") + for i := range bs.buckets { + assert.NotPanics(t, func() { bs.buckets[i].Unlock() }, "Expected all buckets to be locked.") + } +} + +func TestTimerBucketCalculations(t *testing.T) { + tests := []struct { + tick, max time.Duration + buckets int + }{ + // If everything is a power of two, buckets = max / tick. + {2, 8, 4}, + // Tick gets bumped up to 1<<3, so we need 3 buckets to support 20ns + // max without overlap. We then bump that to 1<<2. + {6, 20, 4}, + } + + for _, tt := range tests { + w := NewWheel(tt.tick, tt.max) + defer w.Stop() + assert.Equal(t, tt.buckets, len(w.buckets.buckets), "Unexpected number of buckets.") + } +} + +func TestTimersScheduleAndCancel(t *testing.T) { + const ( + numTimers = 20 + tickDuration = 1 << 22 + maxDelay = tickDuration * numTimers + ) + w, c := fakeWheel(tickDuration) + defer w.Stop() + + var wg sync.WaitGroup + cb := func() { wg.Done() } + + scheduled := make([]*Timer, 0, numTimers) + for i := time.Duration(0); i < numTimers; i++ { + scheduled = append(scheduled, w.AfterFunc(i*tickDuration, cb)) + // wg.Done() panics if the counter is less than zero, but these extra + // calls should never fire. + canceled := w.AfterFunc(i*tickDuration, cb) + assert.True(t, canceled.Stop()) + } + + for elapsed := time.Duration(0); elapsed < maxDelay; elapsed += tickDuration { + wg.Add(1) + c.Add(tickDuration) + wg.Wait() + } + + for i := range scheduled { + assert.False(t, scheduled[i].Stop(), "Shouldn't be able to cancel after expiry.") + } +} + +func TestTimerDroppingTicks(t *testing.T) { + const ( + numTimers = 100 + tickDuration = 1 << 22 + maxDelay = numTimers * tickDuration + ) + w, c := fakeWheel(tickDuration) + defer w.Stop() + + var wg sync.WaitGroup + wg.Add(numTimers) + timeouts := randomTimeouts(numTimers, maxDelay) + for _, d := range timeouts { + w.AfterFunc(d, func() { wg.Done() }) + } + c.Add(maxDelay * 2) + wg.Wait() +} + +func TestTimerClearAfterStop(t *testing.T) { + const numTimers = 100 + w, _ := fakeWheel(1 * time.Millisecond) + + var wg sync.WaitGroup + wg.Add(numTimers) + timeouts := randomTimeouts(numTimers, 100*time.Millisecond) + for _, d := range timeouts { + w.AfterFunc(d, func() { wg.Done() }) + } + + w.Stop() + wg.Wait() +} + +func TestPowersOfTwo(t *testing.T) { + tests := []struct { + in int + out int + exponent int + }{ + {-42, 1, 0}, + {0, 1, 0}, + {1, 1, 0}, + {5, 8, 3}, + {1000, 1024, 10}, + {1 << 22, 1 << 22, 22}, + } + for _, tt := range tests { + n, exp := nextPowerOfTwo(int64(tt.in)) + assert.Equal(t, tt.out, int(n), "Unexpected next power of two for input %v", tt.in) + assert.Equal(t, tt.exponent, int(exp), "Unexpected exponent for input %v", tt.in) + } +} + +func scheduleAndCancel(b *testing.B, w *Wheel) { + ds := randomTimeouts(1024, time.Minute) + for i := range ds { + ds[i] = ds[i] + 24*time.Hour + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + w.AfterFunc(ds[i&10], fakeWork).Stop() + i++ + } + }) +} + +func scheduleAndCancelStd(b *testing.B) { + ds := randomTimeouts(1024, time.Minute) + for i := range ds { + ds[i] = ds[i] + 24*time.Hour + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + time.AfterFunc(ds[i&10], fakeWork).Stop() + i++ + } + }) +} + +func BenchmarkTimerScheduleAndCancelNoHeap(b *testing.B) { + w := NewWheel(5*time.Millisecond, 2*time.Minute) + defer w.Stop() + + scheduleAndCancel(b, w) + b.StopTimer() +} + +func BenchmarkTimerScheduleAndCancelWithHeap(b *testing.B) { + w := NewWheel(5*time.Millisecond, 2*time.Minute) + defer w.Stop() + + ds := randomTimeouts(10000, time.Minute) + for i := range ds { + ds[i] = ds[i] + 24*time.Hour + } + for i := range ds { + w.AfterFunc(ds[i], fakeWork) + } + + scheduleAndCancel(b, w) + b.StopTimer() +} + +func BenchmarkScheduleAndCancelStandardLibraryNoHeap(b *testing.B) { + scheduleAndCancelStd(b) +} + +func BenchmarkScheduleAndCancelStandardLibraryWithHeap(b *testing.B) { + ds := randomTimeouts(10000, time.Minute) + timers := make([]*time.Timer, 10000) + for i := range ds { + ds[i] = ds[i] + 24*time.Hour + } + for i := range ds { + timers[i] = time.AfterFunc(ds[i], fakeWork) + } + + scheduleAndCancelStd(b) + b.StopTimer() + for _, t := range timers { + t.Stop() + } +} + +func BenchmarkTimerWorkThread(b *testing.B) { + // Note that this benchmark includes 1ms of wait time; attempting to reduce + // this by ticking faster is counterproductive, since we start missing + // ticks and then need to lock more buckets to catch up. + w := NewWheel(time.Millisecond, time.Second) + defer w.Stop() + var wg sync.WaitGroup + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + w.AfterFunc(0, func() { wg.Done() }) + wg.Wait() + } +} + +func BenchmarkTimerExpiry(b *testing.B) { + w, c := fakeWheel(time.Millisecond) + defer w.Stop() + var wg sync.WaitGroup + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + w.AfterFunc(time.Millisecond, func() { wg.Done() }) + c.Add(10 * time.Millisecond) + wg.Wait() + } +} + +func BenchmarkStandardLibraryTimerExpiry(b *testing.B) { + // To compare accurately against our timer wheel, we need to create a mock + // clock, listen on it, and advance it. (This actually accounts for the + // vast majority of time spent and memory allocated in this benchmark.) + clock := clock.NewMock() + go func() { + <-clock.Ticker(time.Millisecond).C + }() + + var wg sync.WaitGroup + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + time.AfterFunc(0, func() { wg.Done() }) + clock.Add(10 * time.Millisecond) + wg.Wait() + } +}