Skip to content

Commit

Permalink
feat(pkg): add ticker package (#2617)
Browse files Browse the repository at this point in the history
* Add `pkg/ticker`

* Sample ticker usage in evm observer

* Change naming

* Address PR comments

* Address PR comments
  • Loading branch information
swift1337 authored and gartnera committed Aug 15, 2024
1 parent d9263fc commit f873f59
Show file tree
Hide file tree
Showing 3 changed files with 347 additions and 28 deletions.
140 changes: 140 additions & 0 deletions pkg/ticker/ticker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Package ticker provides a dynamic ticker that can change its interval at runtime.
// The ticker can be stopped gracefully and handles context-based termination.
//
// This package is useful for scenarios where periodic execution of a function is needed
// and the interval might need to change dynamically based on runtime conditions.
//
// It also invokes a first tick immediately after the ticker starts. It's safe to use it concurrently.
//
// It also terminates gracefully when the context is done (return ctx.Err()) or when the stop signal is received.
//
// Example usage:
//
// ticker := New(time.Second, func(ctx context.Context, t *Ticker) error {
// resp, err := client.GetPrice(ctx)
// if err != nil {
// logger.Err(err).Error().Msg("failed to get price")
// return nil
// }
//
// observer.SetPrice(resp.GasPrice)
// t.SetInterval(resp.GasPriceInterval)
//
// return nil
// })
//
// err := ticker.Run(ctx)
package ticker

import (
"context"
"fmt"
"sync"
"time"

"cosmossdk.io/errors"
)

// Ticker represents a ticker that will run a function periodically.
// It also invokes BEFORE ticker starts.
type Ticker struct {
interval time.Duration
ticker *time.Ticker
task Task
signalChan chan struct{}

// runnerMu is a mutex to prevent double run
runnerMu sync.Mutex

// stateMu is a mutex to prevent concurrent SetInterval calls
stateMu sync.Mutex

stopped bool
}

// Task is a function that will be called by the Ticker
type Task func(ctx context.Context, t *Ticker) error

// New creates a new Ticker.
func New(interval time.Duration, runner Task) *Ticker {
return &Ticker{interval: interval, task: runner}
}

// Run creates and runs a new Ticker.
func Run(ctx context.Context, interval time.Duration, task Task) error {
return New(interval, task).Run(ctx)
}

// SecondsFromUint64 converts uint64 to time.Duration in seconds.
func SecondsFromUint64(d uint64) time.Duration {
return time.Duration(d) * time.Second
}

// Run runs the ticker by blocking current goroutine. It also invokes BEFORE ticker starts.
// Stops when (if any):
// - context is done (returns ctx.Err())
// - task returns an error or panics
// - shutdown signal is received
func (t *Ticker) Run(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("panic during ticker run: %v", r)
}
}()

// prevent concurrent runs
t.runnerMu.Lock()
defer t.runnerMu.Unlock()

// setup
t.ticker = time.NewTicker(t.interval)
t.signalChan = make(chan struct{})
t.stopped = false

// initial run
if err := t.task(ctx, t); err != nil {
return errors.Wrap(err, "ticker task failed")
}

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-t.ticker.C:
if err := t.task(ctx, t); err != nil {
return errors.Wrap(err, "ticker task failed")
}
case <-t.signalChan:
return nil
}
}
}

// SetInterval updates the interval of the ticker.
func (t *Ticker) SetInterval(interval time.Duration) {
t.stateMu.Lock()
defer t.stateMu.Unlock()

// noop
if t.interval == interval || t.ticker == nil {
return
}

t.interval = interval
t.ticker.Reset(interval)
}

// Stop stops the ticker. Safe to call concurrently or multiple times.
func (t *Ticker) Stop() {
t.stateMu.Lock()
defer t.stateMu.Unlock()

// noop
if t.stopped || t.signalChan == nil {
return
}

close(t.signalChan)
t.stopped = true
t.ticker.Stop()
}
173 changes: 173 additions & 0 deletions pkg/ticker/ticker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package ticker

import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestTicker(t *testing.T) {
const (
dur = time.Millisecond * 100
durSmall = dur / 10
)

t.Run("Basic case with context", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// And a context
ctx, cancel := context.WithTimeout(context.Background(), dur+durSmall)
defer cancel()

// And a ticker
ticker := New(dur, func(_ context.Context, t *Ticker) error {
counter++

return nil
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorIs(t, err, context.DeadlineExceeded)

// two runs: start run + 1 tick
assert.Equal(t, 2, counter)
})

t.Run("Halts when error occurred", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

ctx := context.Background()

// And a ticker func that returns an error after 10 runs
ticker := New(durSmall, func(_ context.Context, t *Ticker) error {
counter++
if counter > 9 {
return fmt.Errorf("oops")
}

return nil
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorContains(t, err, "oops")
assert.Equal(t, 10, counter)
})

t.Run("Dynamic interval update", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// Given duration
duration := dur * 10

ctx, cancel := context.WithTimeout(context.Background(), duration)
defer cancel()

// And a ticker what decreases the interval by 2 each time
ticker := New(durSmall, func(_ context.Context, ticker *Ticker) error {
t.Logf("Counter: %d, Duration: %s", counter, duration.String())

counter++
duration /= 2

ticker.SetInterval(duration)

return nil
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorIs(t, err, context.DeadlineExceeded)

// It should have run at 2 times with ctxTimeout = tickerDuration (start + 1 tick),
// But it should have run more than that because of the interval decrease
assert.GreaterOrEqual(t, counter, 2)
})

t.Run("Stop ticker", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// And a context
ctx := context.Background()

// And a ticker
ticker := New(durSmall, func(_ context.Context, _ *Ticker) error {
counter++
return nil
})

// And a function with a stop signal
go func() {
time.Sleep(dur)
ticker.Stop()
}()

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.NoError(t, err)
assert.Greater(t, counter, 8)

t.Run("Stop ticker for the second time", func(t *testing.T) {
ticker.Stop()
})
})

t.Run("Panic", func(t *testing.T) {
// ARRANGE
// Given a context
ctx := context.Background()

// And a ticker
ticker := New(durSmall, func(_ context.Context, _ *Ticker) error {
panic("oops")
})

// ACT
err := ticker.Run(ctx)

// ASSERT
assert.ErrorContains(t, err, "panic during ticker run: oops")
})

t.Run("Run as a single call", func(t *testing.T) {
// ARRANGE
// Given a counter
var counter int

// Given a context
ctx, cancel := context.WithTimeout(context.Background(), dur+durSmall)
defer cancel()

tick := func(ctx context.Context, t *Ticker) error {
counter++
return nil
}

// ACT
err := Run(ctx, dur, tick)

// ASSERT
assert.ErrorIs(t, err, context.DeadlineExceeded)
assert.Equal(t, 2, counter)
})
}
Loading

0 comments on commit f873f59

Please sign in to comment.