diff --git a/timers/timers.go b/timers/timers.go new file mode 100644 index 00000000..1d96727d --- /dev/null +++ b/timers/timers.go @@ -0,0 +1,246 @@ +// Package timers provides a timer facility similar to the standard library's. +// It's designed to be faster, but less accurate. +package timers + +import ( + "math" + "sync" + "time" + "unsafe" + + "github.com/andres-erbsen/clock" + "github.com/uber-go/atomic" +) + +const cacheline = 64 + +const ( + // State machine for timers. + stateScheduled = iota + stateExpired + stateCanceled +) + +// A Timer is a handle to a deferred operation. +type Timer struct { + f func() + deadline uint64 // in ticks + state atomic.Int32 + next *Timer + tail *Timer // only set on head (TODO: move to bucket) +} + +func newTimer(deadline uint64, f func()) *Timer { + t := &Timer{ + f: f, + deadline: deadline, + } + t.tail = t + return t +} + +// Stop cancels the deferred operation. It returns whether or not the +// cancellation succeeded. +func (t *Timer) Stop() bool { + return t.state.CAS(stateScheduled, stateCanceled) +} + +func (t *Timer) expire() bool { + return t.state.CAS(stateScheduled, stateExpired) +} + +func (t *Timer) pushOne(head *Timer) *Timer { + head.next = t + if t == nil { + head.tail = head + } else { + head.tail = t.tail + t.tail = nil + } + return head +} + +func (t *Timer) push(head *Timer) *Timer { + if head == nil { + return t + } + if t == nil { + return head + } + head.tail.next = t + head.tail = t.tail + t.tail = nil + return head +} + +type bucket struct { + sync.Mutex + timers *Timer + // Avoid false sharing: + // http://mechanical-sympathy.blogspot.com/2011/07/false-sharing.html + _ [cacheline - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(&Timer{})]byte +} + +type bucketList struct { + mask uint64 + buckets []bucket +} + +func (bs *bucketList) Schedule(deadline uint64, f func()) *Timer { + t := newTimer(deadline, f) + b := &bs.buckets[deadline&bs.mask] + b.Lock() + b.timers = b.timers.pushOne(t) + b.Unlock() + return t +} + +func (bs *bucketList) Clear() { + for i := range bs.buckets { + b := &bs.buckets[i] + b.Lock() + b.timers = nil + b.Unlock() + } +} + +func (bs *bucketList) GatherExpired(start, end, now uint64) *Timer { + // If we're more than a full rotation behind, just inspect each bucket + // once. + if (end-start == math.MaxUint64) || (int(end-start+1) > len(bs.buckets)) { + start, end = 0, uint64(len(bs.buckets)) + } + + var todo *Timer + for tick := start; tick < end; tick++ { + todo = bs.gatherBucket(tick, now, todo) + } + return todo +} + +func (bs *bucketList) gatherBucket(tick, now uint64, todo *Timer) *Timer { + b := &bs.buckets[tick&bs.mask] + + b.Lock() + batch := b.timers + b.timers = nil + b.Unlock() + + var unexpired *Timer + for batch != nil { + next := batch.next + if batch.deadline > now { + unexpired = unexpired.pushOne(batch) + } else if batch.expire() { + todo = todo.pushOne(batch) + } + batch = next + } + if unexpired != nil { + // We should only hit this case if we're a full wheel rotation + // behind. + b.Lock() + b.timers = b.timers.push(unexpired) + b.Unlock() + } + return todo +} + +// A Wheel schedules and executes deferred operations. +type Wheel struct { + clock clock.Clock + periodExp uint64 + ticker *clock.Ticker + buckets bucketList + stopOnce sync.Once + stop chan struct{} + stopped chan struct{} +} + +// NewWheel creates and starts a new Wheel. +func NewWheel(period, maxTimeout time.Duration) *Wheel { + w := newWheel(period, maxTimeout, clock.New()) + w.start() + return w +} + +func newWheel(period, maxTimeout time.Duration, clock clock.Clock) *Wheel { + tickNanos, power := nextPowerOfTwo(int64(period)) + numBuckets, _ := nextPowerOfTwo(int64(maxTimeout) / tickNanos) + if time.Duration(numBuckets*tickNanos) < maxTimeout { + numBuckets = numBuckets << 1 + } + w := &Wheel{ + clock: clock, + periodExp: power, + ticker: clock.Ticker(time.Duration(tickNanos)), + buckets: bucketList{ + mask: uint64(numBuckets - 1), + buckets: make([]bucket, numBuckets), + }, + stop: make(chan struct{}), + stopped: make(chan struct{}), + } + return w +} + +// Stop shuts down the wheel, blocking until all the background goroutines +// complete. It immediately fires all pending operations, regardless of their +// deadlines. Operations scheduled after the wheel is stopped aren't guaranteed +// to complete. +// +// Stop is safe to call multiple times; calls after the first are no-ops. +func (w *Wheel) Stop() { + // TChannel Channels are safe to close more than once, so wheels should be + // too. + w.stopOnce.Do(func() { + w.ticker.Stop() + close(w.stop) + <-w.stopped + w.buckets.Clear() + }) +} + +// AfterFunc schedules a deferred operation and returns a Timer. +func (w *Wheel) AfterFunc(d time.Duration, f func()) *Timer { + deadline := w.asTick(w.clock.Now().Add(d)) + return w.buckets.Schedule(deadline, f) +} + +func (w *Wheel) start() { + go w.tick(w.clock.Now(), w.ticker.C) +} + +func (w *Wheel) tick(now time.Time, nowCh <-chan time.Time) { + watermark := w.asTick(now) + for { + select { + case now := <-nowCh: + nowTick := w.asTick(now) + todo := w.buckets.GatherExpired(watermark, nowTick, nowTick) + w.fire(todo) + watermark = nowTick + case <-w.stop: + close(w.stopped) + return + } + } +} + +func (w *Wheel) fire(batch *Timer) { + for t := batch; t != nil; t = t.next { + t.f() + } +} + +func (w *Wheel) asTick(t time.Time) uint64 { + return uint64(t.UnixNano() >> w.periodExp) +} + +func nextPowerOfTwo(n int64) (num int64, exponent uint64) { + pow := uint64(0) + for (1 << pow) < n { + pow++ + } + return 1 << pow, pow +} diff --git a/timers/timers_test.go b/timers/timers_test.go new file mode 100644 index 00000000..63e9ad08 --- /dev/null +++ b/timers/timers_test.go @@ -0,0 +1,422 @@ +package timers + +import ( + "math" + "math/rand" + "sync" + "testing" + "time" + "unsafe" + + "github.com/andres-erbsen/clock" + "github.com/stretchr/testify/assert" +) + +// O(n), only for use in tests. +func (t *Timer) len() int { + n := 0 + for el := t; el != nil; el = el.next { + n++ + } + return n +} + +func fakeWheel(tick time.Duration) (*Wheel, *clock.Mock) { + clock := clock.NewMock() + w := newWheel(tick, 2*time.Minute, clock) + w.start() + return w, clock +} + +func newBucketList() *bucketList { + return &bucketList{ + mask: uint64(1<<3 - 1), + buckets: make([]bucket, 1<<3), + } +} + +func assertQueuedTimers(t testing.TB, bs *bucketList, bucketIdx, expected int) { + b := &bs.buckets[bucketIdx] + b.Lock() + defer b.Unlock() + assert.Equal( + t, + expected, + b.timers.len(), + "Unexpected number of counters in bucket %v.", bucketIdx, + ) +} + +func fakeWork() {} // non-nil func to schedule + +func randomTimeouts(n int, max time.Duration) []time.Duration { + r := rand.New(rand.NewSource(time.Now().UnixNano())) + ds := make([]time.Duration, n) + for i := 0; i < n; i++ { + ds[i] = time.Duration(r.Int63n(int64(max))) + } + return ds +} + +func TestTimersLinkedListPushOneNils(t *testing.T) { + var root *Timer + head := newTimer(0, nil) + root = root.pushOne(head) + assert.Equal(t, 1, root.len(), "Unexpected length after pushing one timer to nil root.") + assert.Equal(t, head, root, "Expected pushOne with nil receiver to return head.") + assert.Nil(t, root.next, "Expected one-node list to have a nil next link.") + assert.Equal(t, root, root.tail, "Expected head of single-node list to be its own tail.") + assert.Panics(t, func() { root.pushOne(nil) }, "Expected pushOne'ing a nil to panic.") +} + +func TestTimersLinkedListPushNils(t *testing.T) { + // nil receiver & head + var root *Timer + assert.Equal(t, 0, root.push(nil).len(), "Unexpected length after pushing nil on nil timer.") + + // nil receiver + head := newTimer(0, nil) + root = root.push(head) + assert.Equal(t, 1, root.len(), "Unexpected length after pushing timer to nil root.") + assert.Equal(t, head, root, "Expected push with nil receiver to return head list.") + assert.Nil(t, root.next, "Expected one-node list to have a nil next link.") + assert.Equal(t, root, root.tail, "Expected head of single-node list to be its own tail.") + + // nil head + originalRoot := newTimer(0, nil) + root = originalRoot + root = root.push(nil) + assert.Equal(t, 1, root.len(), "Unexpected length after pushing nil to a len-1 root.") + assert.Equal(t, originalRoot, root, "Expected pushing nil onto a list to return original list.") + assert.Nil(t, root.next, "Expected one-node list to have a nil next link.") + assert.Equal(t, root, root.tail, "Expected head of single-node list to be its own tail.") +} + +func TestTimersLinkedList(t *testing.T) { + els := []*Timer{ + newTimer(0, nil), + newTimer(1, nil), + newTimer(2, nil), + newTimer(3, nil), + newTimer(4, nil), + newTimer(5, nil), + } + // Build a single-node list. + var root *Timer + root = root.pushOne(els[0]) + assert.Equal(t, root, els[0], "Unexpected first node.") + assert.Equal(t, 1, root.len(), "Unexpected length after pushing one element.") + assert.Nil(t, root.next, "Expected one-node list to have a nil next link.") + assert.Equal(t, els[0], root.tail, "Expected head of single-node list to be its own tail.") + + // Add a second element. + root = root.pushOne(els[1]) + assert.Equal(t, root, els[1], "Expected new head to precede existing nodes.") + assert.Equal(t, 2, root.len(), "Expected pushing to list to extend length.") + assert.Equal(t, els[0], root.next, "Expected head node to point to next node.") + assert.Equal(t, els[0], root.tail, "Expected tail of two-node list to point to last node.") + + // Push a list. + root = root.push((*Timer)(nil). + pushOne(els[2]). + pushOne(els[3]). + pushOne(els[4]). + pushOne(els[5])) + assert.Equal(t, root, els[5], "Expected pushing a list to return new head.") + assert.Equal(t, 6, root.len(), "Expected pushing a list to extend length.") + assert.Equal(t, els[4], root.next, "Expected head of list to point to next node.") + assert.Equal(t, els[0], root.tail, "Expected tail of list to point to last node.") + + var deadlines []uint64 + for el := root; el != nil; el = el.next { + deadlines = append(deadlines, el.deadline) + } + assert.Equal(t, []uint64{5, 4, 3, 2, 1, 0}, deadlines, "Unexpected list ordering.") +} + +func TestBucketsAvoidFalseSharing(t *testing.T) { + assert.Equal(t, + int(cacheline), + int(unsafe.Sizeof(bucket{})), + "Expected buckets to exactly fill a CPU cache line.", + ) +} + +func TestBucketListSchedule(t *testing.T) { + bs := newBucketList() + assertQueuedTimers(t, bs, 0, 0) + + bs.Schedule(uint64(0), fakeWork) + assertQueuedTimers(t, bs, 0, 1) + + bs.Schedule(uint64(1), fakeWork) + assertQueuedTimers(t, bs, 1, 1) + + bs.Schedule(uint64(7), fakeWork) + assertQueuedTimers(t, bs, 7, 1) + + bs.Schedule(uint64(8), fakeWork) + assertQueuedTimers(t, bs, 0, 2) +} + +func TestBucketListGatherExpired(t *testing.T) { + bs := newBucketList() + for i := range bs.buckets { + bs.Schedule(uint64(i), fakeWork) + } + assert.Equal(t, 0, bs.GatherExpired(0, 0, 0).len(), "Unexpected no. expired timers in range [0,0).") + assert.Equal(t, 3, bs.GatherExpired(0, 3, 3).len(), "Unexpected no. expired timers in range [0,3).") + assert.Equal(t, 0, bs.GatherExpired(0, 3, 3).len(), "Unexpected no. expired timers in range [0,3) on re-gather.") + assert.Equal(t, 5, bs.GatherExpired(3, 100, 100).len(), "Unexpected no. expired timers in range [3,100).") + + // We should be leaving unexpired timers in place. + bs.Schedule(8, fakeWork) + bs.Schedule(8, fakeWork) + assert.Equal(t, 0, bs.GatherExpired(0, 1, 1).len(), "Unexpected no. expired timers in range [0,1).") + assertQueuedTimers(t, bs, 0, 2) +} + +func TestBucketListClear(t *testing.T) { + bs := newBucketList() + for i := range bs.buckets { + bs.Schedule(uint64(i), fakeWork) + } + bs.Schedule(uint64(1<<20), fakeWork) + bs.Clear() + all := bs.GatherExpired(0, math.MaxUint64, math.MaxUint64) + assert.Equal(t, 0, all.len(), "Unexpected number of timers after clearing bucketList.") + for i := range bs.buckets { + assert.Panics(t, func() { bs.buckets[i].Unlock() }, "Expected all buckets to be unlocked.") + } +} + +func TestTimerBucketCalculations(t *testing.T) { + tests := []struct { + tick, max time.Duration + buckets int + }{ + // If everything is a power of two, buckets = max / tick. + {2, 8, 4}, + // Tick gets bumped up to 1<<3, so we need 3 buckets to support 20ns + // max without overlap. We then bump that to 1<<2. + {6, 20, 4}, + } + + for _, tt := range tests { + w := NewWheel(tt.tick, tt.max) + defer w.Stop() + assert.Equal(t, tt.buckets, len(w.buckets.buckets), "Unexpected number of buckets.") + } +} + +func TestTimersScheduleAndCancel(t *testing.T) { + const ( + numTimers = 20 + tickDuration = 1 << 22 + maxDelay = tickDuration * numTimers + ) + w, c := fakeWheel(tickDuration) + defer w.Stop() + + var wg sync.WaitGroup + scheduled := make([]*Timer, 0, numTimers) + for i := time.Duration(0); i < numTimers; i++ { + scheduled = append(scheduled, w.AfterFunc(i*tickDuration, wg.Done)) + // wg.Done() panics if the counter is less than zero, but these extra + // calls should never fire. + canceled := w.AfterFunc(i*tickDuration, wg.Done) + assert.True(t, canceled.Stop()) + } + + for i := 0; i < numTimers; i++ { + wg.Add(1) + c.Add(tickDuration) + wg.Wait() + } + + for _, timer := range scheduled { + assert.False(t, timer.Stop(), "Shouldn't be able to cancel after expiry.") + } +} + +func TestTimerDroppingTicks(t *testing.T) { + const ( + numTimers = 100 + tickDuration = 1 << 22 + maxDelay = numTimers * tickDuration + ) + now := make(chan time.Time) + defer close(now) + + c := clock.NewMock() + w := newWheel(5*time.Millisecond, 2*time.Minute, c) + go w.tick(time.Unix(0, 0), now) + defer w.Stop() + + var wg sync.WaitGroup + wg.Add(numTimers) + timeouts := randomTimeouts(numTimers, maxDelay) + for _, d := range timeouts { + w.AfterFunc(d, func() { wg.Done() }) + } + now <- time.Unix(0, int64(2*maxDelay)) + wg.Wait() +} + +func TestTimerStopClearsWheel(t *testing.T) { + const numTimers = 100 + w, _ := fakeWheel(1 * time.Millisecond) + + timeouts := randomTimeouts(numTimers, 100*time.Millisecond) + for _, d := range timeouts { + w.AfterFunc(d, fakeWork) + } + + w.Stop() + all := w.buckets.GatherExpired(0, math.MaxUint64, math.MaxUint64) + assert.Equal(t, 0, all.len(), "Expected wheel.Stop to clear all timers.") +} + +func TestPowersOfTwo(t *testing.T) { + tests := []struct { + in int + out int + exponent int + }{ + {-42, 1, 0}, + {0, 1, 0}, + {1, 1, 0}, + {5, 8, 3}, + {1000, 1024, 10}, + {1 << 22, 1 << 22, 22}, + } + for _, tt := range tests { + n, exp := nextPowerOfTwo(int64(tt.in)) + assert.Equal(t, tt.out, int(n), "Unexpected next power of two for input %v", tt.in) + assert.Equal(t, tt.exponent, int(exp), "Unexpected exponent for input %v", tt.in) + } +} + +func scheduleAndCancel(b *testing.B, w *Wheel) { + ds := randomTimeouts(1024, time.Minute) + for i := range ds { + ds[i] = ds[i] + 24*time.Hour + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + w.AfterFunc(ds[i&10], fakeWork).Stop() + i++ + } + }) +} + +func scheduleAndCancelStd(b *testing.B) { + ds := randomTimeouts(1024, time.Minute) + for i := range ds { + ds[i] = ds[i] + 24*time.Hour + } + b.ResetTimer() + + b.RunParallel(func(pb *testing.PB) { + i := 0 + for pb.Next() { + time.AfterFunc(ds[i&10], fakeWork).Stop() + i++ + } + }) +} + +func BenchmarkScheduleAndCancelWheelNoHeap(b *testing.B) { + w := NewWheel(5*time.Millisecond, 2*time.Minute) + defer w.Stop() + + scheduleAndCancel(b, w) + b.StopTimer() +} + +func BenchmarkScheduleAndCancelWheelWithHeap(b *testing.B) { + w := NewWheel(5*time.Millisecond, 2*time.Minute) + defer w.Stop() + + ds := randomTimeouts(10000, time.Minute) + for i := range ds { + ds[i] = ds[i] + 24*time.Hour + } + for i := range ds { + w.AfterFunc(ds[i], fakeWork) + } + + scheduleAndCancel(b, w) + b.StopTimer() +} + +func BenchmarkScheduleAndCancelStandardLibraryNoHeap(b *testing.B) { + scheduleAndCancelStd(b) +} + +func BenchmarkScheduleAndCancelStandardLibraryWithHeap(b *testing.B) { + ds := randomTimeouts(10000, time.Minute) + timers := make([]*time.Timer, 10000) + for i := range ds { + timers[i] = time.AfterFunc(ds[i]+24*time.Hour, fakeWork) + } + + scheduleAndCancelStd(b) + b.StopTimer() + for _, t := range timers { + t.Stop() + } +} + +func BenchmarkWheelWorkThread(b *testing.B) { + // Note that this benchmark includes 1ms of wait time; attempting to reduce + // this by ticking faster is counterproductive, since we start missing + // ticks and then need to lock more buckets to catch up. + w := NewWheel(time.Millisecond, time.Second) + defer w.Stop() + var wg sync.WaitGroup + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + w.AfterFunc(0, func() { wg.Done() }) + wg.Wait() + } +} + +func BenchmarkWheelTimerExpiry(b *testing.B) { + w, c := fakeWheel(time.Millisecond) + defer w.Stop() + var wg sync.WaitGroup + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + w.AfterFunc(time.Millisecond, func() { wg.Done() }) + c.Add(10 * time.Millisecond) + wg.Wait() + } +} + +func BenchmarkStandardLibraryTimerExpiry(b *testing.B) { + // To compare accurately against our timer wheel, we need to create a mock + // clock, listen on it, and advance it. (This actually accounts for the + // vast majority of time spent and memory allocated in this benchmark.) + clock := clock.NewMock() + go func() { + <-clock.Ticker(time.Millisecond).C + }() + + var wg sync.WaitGroup + b.ResetTimer() + + for i := 0; i < b.N; i++ { + wg.Add(1) + time.AfterFunc(0, func() { wg.Done() }) + clock.Add(10 * time.Millisecond) + wg.Wait() + } +}