Skip to content

Commit

Permalink
Add timer wheel package
Browse files Browse the repository at this point in the history
  • Loading branch information
Akshay Shah committed Dec 8, 2016
1 parent 90a6599 commit d42c91b
Show file tree
Hide file tree
Showing 2 changed files with 652 additions and 0 deletions.
244 changes: 244 additions & 0 deletions timers/timers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
// Package timers provides a timer facility similar to the standard library's.
// It's designed to be faster, but less accurate.
package timers

import (
"sync"
"time"
"unsafe"

"github.com/andres-erbsen/clock"
"github.com/uber-go/atomic"
)

const cacheline = 64

const (
// State machine for timers.
stateScheduled = iota
stateExpired
stateCanceled
)

// A Timer is a handle to a deferred operation.
type Timer struct {
f func()
deadline uint64 // in ticks
state atomic.Int32
next *Timer
tail *Timer // only set on head
}

// Stop cancels the deferred operation. It returns whether or not the
// cancellation succeeded.
func (t *Timer) Stop() bool {
if t == nil {
return true // canceling a nil timer always succeeds
}
return t.state.CAS(stateScheduled, stateCanceled)
}

func (t *Timer) expire() bool {
return t.state.CAS(stateScheduled, stateExpired)
}

func (t *Timer) pushOne(head *Timer) *Timer {
head.next, head.tail = nil, nil
return t.push(head)
}

func (t *Timer) push(head *Timer) *Timer {
// link lists
if head == nil {
return t
}
if head.tail == nil {
head.next = t
} else {
head.tail.next = t
}

// set tail
if t == nil {
return head
}
if t.tail == nil {
head.tail = t
} else {
head.tail = t.tail
t.tail = nil
}
return head
}

type bucket struct {
sync.Mutex
timers *Timer
// Avoid false sharing:
// http://mechanical-sympathy.blogspot.com/2011/07/false-sharing.html
_ [cacheline - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(&Timer{})]byte
}

type bucketList struct {
mask uint64
buckets []bucket
}

func (bs *bucketList) Schedule(deadline uint64, f func()) *Timer {
if f == nil {
return nil
}
t := &Timer{
f: f,
deadline: deadline,
}
b := &bs.buckets[deadline&bs.mask]
b.Lock()
b.timers = b.timers.push(t)
b.Unlock()
return t
}

func (bs *bucketList) GatherExpired(start, end, now uint64) *Timer {
// If we're more than a full rotation behind, just inspect each bucket
// once.
if int(end-start+1) > len(bs.buckets) {
start, end = 0, uint64(len(bs.buckets))
}

var todo *Timer
for tick := start; tick < end; tick++ {
todo = bs.gatherBucket(tick, now, todo)
}
return todo
}

func (bs *bucketList) gatherBucket(tick, now uint64, todo *Timer) *Timer {
b := &bs.buckets[tick&bs.mask]

b.Lock()
batch := b.timers
b.timers = nil
b.Unlock()

var unexpired *Timer
for batch != nil {
next := batch.next
if batch.deadline > now {
unexpired = unexpired.pushOne(batch)
} else if batch.expire() {
todo = todo.pushOne(batch)
}
batch = next
}
if unexpired != nil {
// We should only hit this case if we're a full wheel rotation
// behind.
b.Lock()
b.timers = b.timers.push(unexpired)
b.Unlock()
}
return todo
}

func (bs *bucketList) Clear() *Timer {
var todo *Timer
for i := range bs.buckets {
b := &bs.buckets[i]
b.Lock()
todo = todo.push(b.timers)
b.Unlock()
}
return todo
}

// A Wheel schedules and executes deferred operations.
type Wheel struct {
clock clock.Clock
periodExp uint64
ticker *clock.Ticker
buckets bucketList
stopOnce sync.Once
stop chan struct{}
stopped chan struct{}
}

// NewWheel creates and starts a new Wheel.
func NewWheel(period, maxTimeout time.Duration) *Wheel {
return newWheel(period, maxTimeout, clock.New())
}

func newWheel(period, maxTimeout time.Duration, clock clock.Clock) *Wheel {
tickNanos, power := nextPowerOfTwo(int64(period))
numBuckets, _ := nextPowerOfTwo(int64(maxTimeout) / tickNanos)
if time.Duration(numBuckets*tickNanos) < maxTimeout {
numBuckets = numBuckets << 1
}
w := &Wheel{
clock: clock,
periodExp: power,
ticker: clock.Ticker(time.Duration(tickNanos)),
buckets: bucketList{
mask: uint64(numBuckets - 1),
buckets: make([]bucket, numBuckets),
},
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
go w.tick(clock.Now())
return w
}

// Stop shuts down the wheel, blocking until all the background goroutines
// complete.
func (w *Wheel) Stop() {
// TChannel Channels are safe to close more than once, so wheels should be
// too.
w.stopOnce.Do(func() {
w.ticker.Stop()
close(w.stop)
<-w.stopped
todo := w.buckets.Clear()
w.fire(todo)
})
}

// AfterFunc creates a Timer in the wheel.
func (w *Wheel) AfterFunc(d time.Duration, f func()) *Timer {
deadline := w.asTick(w.clock.Now().Add(d))
return w.buckets.Schedule(deadline, f)
}

func (w *Wheel) tick(now time.Time) {
watermark := w.asTick(now)
for {
select {
case now := <-w.ticker.C:
nowTick := w.asTick(now)
todo := w.buckets.GatherExpired(watermark, nowTick, nowTick)
w.fire(todo)
watermark = nowTick
case <-w.stop:
close(w.stopped)
return
}
}
}

func (w *Wheel) fire(batch *Timer) {
for t := batch; t != nil; t = t.next {
t.f()
}
}

func (w *Wheel) asTick(t time.Time) uint64 {
return uint64(t.UnixNano() >> w.periodExp)
}

func nextPowerOfTwo(n int64) (num int64, exponent uint64) {
pow := uint64(0)
for (1 << pow) < n {
pow++
}
return 1 << pow, pow
}
Loading

0 comments on commit d42c91b

Please sign in to comment.