From 81100b7b7771148c021f3cef01bb6a9002135468 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Wed, 18 Dec 2024 20:12:48 +0100 Subject: [PATCH 01/11] Implement ticker.StopBlocking() --- pkg/ticker/ticker.go | 52 ++++++++++++++---- pkg/ticker/ticker_test.go | 110 +++++++++++++++++++++++++++++++++++++- 2 files changed, 149 insertions(+), 13 deletions(-) diff --git a/pkg/ticker/ticker.go b/pkg/ticker/ticker.go index ebac3e1a96..01125dee7c 100644 --- a/pkg/ticker/ticker.go +++ b/pkg/ticker/ticker.go @@ -56,6 +56,7 @@ type Ticker struct { stopped bool ctxCancel context.CancelFunc + runCompleteChan chan struct{} externalStopChan <-chan struct{} logger zerolog.Logger } @@ -103,7 +104,16 @@ func Run(ctx context.Context, interval time.Duration, task Task, opts ...Opt) er // - task returns an error or panics // - shutdown signal is received func (t *Ticker) Run(ctx context.Context) (err error) { + // prevent concurrent runs + t.runnerMu.Lock() + defer t.runnerMu.Unlock() + + ctx = t.setStartState(ctx) + defer func() { + // used in StopBlocking() + close(t.runCompleteChan) + if r := recover(); r != nil { stack := string(debug.Stack()) lines := strings.Split(stack, "\n") @@ -116,21 +126,14 @@ func (t *Ticker) Run(ctx context.Context) (err error) { } }() - // prevent concurrent runs - t.runnerMu.Lock() - defer t.runnerMu.Unlock() - - // setup - ctx, t.ctxCancel = context.WithCancel(ctx) - t.ticker = time.NewTicker(t.interval) - t.stopped = false - // initial run - if err := t.task(ctx, t); err != nil { + if err = t.task(ctx, t); err != nil { t.Stop() return fmt.Errorf("ticker task failed (initial run): %w", err) } + defer t.setStopState() + for { select { case <-ctx.Done(): @@ -172,8 +175,35 @@ func (t *Ticker) SetInterval(interval time.Duration) { t.ticker.Reset(interval) } -// Stop stops the ticker. Safe to call concurrently or multiple times. +// Stop stops the ticker in a NON-blocking way. If the task is running in a separate goroutine, +// this call *might* not wait for it to finish. To wait for task finish, use StopBlocking(). +// It's safe to call Stop() multiple times / concurrently / within the task. func (t *Ticker) Stop() { + t.setStopState() +} + +// StopBlocking stops the ticker in a blocking way i.e. it waits for the task to finish. +// DO NOT call this within the task. +func (t *Ticker) StopBlocking() { + t.setStopState() + <-t.runCompleteChan +} + +func (t *Ticker) setStartState(ctx context.Context) context.Context { + t.stateMu.Lock() + defer t.stateMu.Unlock() + + ctx, t.ctxCancel = context.WithCancel(ctx) + t.ticker = time.NewTicker(t.interval) + t.stopped = false + + // this signals that Run() is about to return + t.runCompleteChan = make(chan struct{}) + + return ctx +} + +func (t *Ticker) setStopState() { t.stateMu.Lock() defer t.stateMu.Unlock() diff --git a/pkg/ticker/ticker_test.go b/pkg/ticker/ticker_test.go index 4e03a28d4f..e7d0cf1ffe 100644 --- a/pkg/ticker/ticker_test.go +++ b/pkg/ticker/ticker_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "sync/atomic" "testing" "time" @@ -19,6 +20,8 @@ func TestTicker(t *testing.T) { ) t.Run("Basic case with context", func(t *testing.T) { + t.Parallel() + // ARRANGE // Given a counter var counter int @@ -45,6 +48,8 @@ func TestTicker(t *testing.T) { }) t.Run("Halts when error occurred", func(t *testing.T) { + t.Parallel() + // ARRANGE // Given a counter var counter int @@ -70,6 +75,8 @@ func TestTicker(t *testing.T) { }) t.Run("Dynamic interval update", func(t *testing.T) { + t.Parallel() + // ARRANGE // Given a counter var counter int @@ -104,6 +111,8 @@ func TestTicker(t *testing.T) { }) t.Run("Stop ticker", func(t *testing.T) { + t.Parallel() + // ARRANGE // Given a counter var counter int @@ -135,7 +144,96 @@ func TestTicker(t *testing.T) { }) }) + t.Run("Stop ticker in a blocking fashion", func(t *testing.T) { + t.Parallel() + + const ( + tickerInterval = 100 * time.Millisecond + workDuration = 600 * time.Millisecond + stopAfterStart = workDuration + tickerInterval/2 + ) + + newLogger := func(t *testing.T) zerolog.Logger { + return zerolog.New(zerolog.NewTestWriter(t)).With().Timestamp().Logger() + } + + // test task that imitates some work + newTask := func(counter *int32, logger zerolog.Logger) Task { + return func(ctx context.Context, _ *Ticker) error { + logger.Info().Msg("Tick start") + atomic.AddInt32(counter, 1) + + time.Sleep(workDuration) + + logger.Info().Msgf("Tick end") + atomic.AddInt32(counter, -1) + + return nil + } + } + + t.Run("Non-blocking stop fails do finish the work", func(t *testing.T) { + t.Parallel() + + // ARRANGE + // Given some test task that imitates some work + testLogger := newLogger(t) + counter := int32(0) + task := newTask(&counter, testLogger) + + // Given a ticker + ticker := New(tickerInterval, task, WithLogger(testLogger, "test-non-blocking-ticker")) + + // ACT + // Imitate the ticker run in the background + go func() { + err := ticker.Run(context.Background()) + require.NoError(t, err) + }() + + // Then stop the ticker after some delay + time.Sleep(stopAfterStart) + testLogger.Info().Msg("Stopping ticker") + ticker.Stop() + testLogger.Info().Msg("Stopped ticker") + + // ASSERT + // If ticker is stopped BEFORE the work is done i.e. "in the middle of work", + // thus the counter would be `1. You can also check the logs + assert.Equal(t, int32(1), counter) + }) + + t.Run("Blocking stop works as expected", func(t *testing.T) { + t.Parallel() + + // ARRANGE + // Now if we have the SAME test but with blocking stop, it should work + testLogger := newLogger(t) + counter := int32(0) + task := newTask(&counter, testLogger) + + ticker := New(tickerInterval, task, WithLogger(testLogger, "test-non-blocking-ticker")) + + // ACT + go func() { + err := ticker.Run(context.Background()) + require.NoError(t, err) + }() + + time.Sleep(stopAfterStart) + testLogger.Info().Msg("Stopping ticker") + ticker.StopBlocking() + testLogger.Info().Msg("Stopped ticker") + + // ASSERT + // If ticker is stopped AFTER the work is done + assert.Equal(t, int32(0), counter) + }) + }) + t.Run("Panic", func(t *testing.T) { + t.Parallel() + // ARRANGE // Given a context ctx := context.Background() @@ -151,10 +249,12 @@ func TestTicker(t *testing.T) { // ASSERT assert.ErrorContains(t, err, "panic during ticker run: oops") // assert that we get error with the correct line number - assert.ErrorContains(t, err, "ticker_test.go:145") + assert.ErrorContains(t, err, "ticker_test.go:243") }) t.Run("Nil panic", func(t *testing.T) { + t.Parallel() + // ARRANGE // Given a context ctx := context.Background() @@ -176,10 +276,12 @@ func TestTicker(t *testing.T) { "panic during ticker run: runtime error: invalid memory address or nil pointer dereference", ) // assert that we get error with the correct line number - assert.ErrorContains(t, err, "ticker_test.go:165") + assert.ErrorContains(t, err, "ticker_test.go:265") }) t.Run("Run as a single call", func(t *testing.T) { + t.Parallel() + // ARRANGE // Given a counter var counter int @@ -202,6 +304,8 @@ func TestTicker(t *testing.T) { }) t.Run("With stop channel", func(t *testing.T) { + t.Parallel() + // ARRANGE var ( tickerInterval = 100 * time.Millisecond @@ -232,6 +336,8 @@ func TestTicker(t *testing.T) { }) t.Run("With logger", func(t *testing.T) { + t.Parallel() + // ARRANGE out := &bytes.Buffer{} logger := zerolog.New(out) From e6c9fc7127b0f0e57db79cda3ac8f21db73c3ad4 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 19 Dec 2024 11:25:39 +0100 Subject: [PATCH 02/11] Add Scheduler package --- pkg/scheduler/scheduler.go | 223 +++++++++++++++++++++++++++++++++++++ 1 file changed, 223 insertions(+) create mode 100644 pkg/scheduler/scheduler.go diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go new file mode 100644 index 0000000000..c2940ec3f3 --- /dev/null +++ b/pkg/scheduler/scheduler.go @@ -0,0 +1,223 @@ +// Package scheduler provides a background task scheduler that allows for the registration, +// execution, and management of periodic tasks. Tasks can be grouped, named, and configured +// with various options such as custom intervals, log fields, and skip conditions. +// +// The scheduler supports dynamic interval updates and can gracefully stop tasks either +// individually or by group. +package scheduler + +import ( + "context" + "sync" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog" + + "github.com/zeta-chain/node/pkg/bg" + "github.com/zeta-chain/node/pkg/ticker" +) + +// Scheduler represents background task scheduler. +type Scheduler struct { + definitions map[uuid.UUID]*Definition + mu sync.RWMutex + logger zerolog.Logger +} + +type Task func(ctx context.Context) error + +type Group string + +const DefaultGroup = Group("default") + +type Definition struct { + scheduler *Scheduler + + id uuid.UUID + group Group + name string + task Task + ticker *ticker.Ticker + + interval time.Duration + intervalUpdater func() time.Duration + skipper func() bool + + logFields map[string]any + logger zerolog.Logger + + // todo block subscriber (on zeta-chain new block) +} + +// New Scheduler instance. +func New(logger zerolog.Logger) *Scheduler { + return &Scheduler{ + definitions: make(map[uuid.UUID]*Definition), + logger: logger.With().Str("module", "scheduler").Logger(), + } +} + +// Opt Definition option +type Opt func(*Definition) + +// Name sets task name. +func Name(name string) Opt { + return func(d *Definition) { d.name = name } +} + +func GroupName(group Group) Opt { + return func(d *Definition) { d.group = group } +} + +// LogFields augments definition logger with some fields. +func LogFields(fields map[string]any) Opt { + return func(d *Definition) { d.logFields = fields } +} + +// Interval sets initial task interval. +func Interval(interval time.Duration) Opt { + return func(d *Definition) { d.interval = interval } +} + +// Skipper sets task skipper function +func Skipper(skipper func() bool) Opt { + return func(d *Definition) { d.skipper = skipper } +} + +// IntervalUpdater sets interval updater function. +func IntervalUpdater(intervalUpdater func() time.Duration) Opt { + return func(d *Definition) { d.intervalUpdater = intervalUpdater } +} + +// Register registers and starts new task in the background +func (s *Scheduler) Register(ctx context.Context, task Task, opts ...Opt) *Definition { + id := uuid.New() + def := &Definition{ + scheduler: s, + id: id, + group: DefaultGroup, + name: id.String(), + task: task, + interval: time.Second, + } + for _, opt := range opts { + opt(def) + } + + logOpts := s.logger.With(). + Str("task.name", def.name). + Str("task.group", string(def.group)) + + if len(def.logFields) > 0 { + logOpts = logOpts.Fields(def.logFields) + } + + def.logger = logOpts.Logger() + + defTicker := def.provisionTicker(task) + + bgTask := func(ctx context.Context) error { + return defTicker.Run(ctx) + } + + s.mu.Lock() + s.definitions[id] = def + s.mu.Unlock() + + // Run async worker + bg.Work(ctx, bgTask, bg.WithLogger(def.logger), bg.WithName(def.name)) + + return def +} + +// Stop stops all tasks. +func (s *Scheduler) Stop() { + s.StopGroup("") +} + +// StopGroup stops all tasks in the group. +func (s *Scheduler) StopGroup(group Group) { + var selectedDefs []*Definition + + s.mu.RLock() + + // Filter desired definitions + for _, def := range s.definitions { + // "" is for wildcard i.e. all groups + if group == "" || def.group == group { + selectedDefs = append(selectedDefs, def) + } + } + + s.mu.RUnlock() + + if len(selectedDefs) == 0 { + return + } + + // Stop all selected tasks concurrently + var wg sync.WaitGroup + wg.Add(len(selectedDefs)) + + for _, def := range selectedDefs { + go func(def *Definition) { + defer wg.Done() + def.Stop() + }(def) + } + + wg.Wait() +} + +// Stop stops the task and offloads it from the scheduler. +func (d *Definition) Stop() { + start := time.Now() + d.logger.Info().Msg("Stopping scheduler task") + d.ticker.StopBlocking() + d.logger.Info().Dur("time_taken", time.Since(start)).Msg("Stopped scheduler task") + + // delete definition from scheduler + d.scheduler.mu.Lock() + delete(d.scheduler.definitions, d.id) + d.scheduler.mu.Unlock() +} + +func (d *Definition) provisionTicker(task Task) *ticker.Ticker { + d.ticker = ticker.New( + d.interval, + d.tickerTask(task), + ticker.WithLogger(d.logger, d.name), + ) + + return d.ticker +} + +// tickerTask wraps Task to be executed by ticker.Ticker +func (d *Definition) tickerTask(task Task) ticker.Task { + // todo metrics + // - duration + // - outcome (skip, err, ok) + // - bump invocation counter + + return func(ctx context.Context, t *ticker.Ticker) error { + // skip tick + if d.skipper != nil && d.skipper() { + return nil + } + + err := task(ctx) + + if err != nil { + d.logger.Error().Err(err).Msg("task failed") + return nil + } + + if d.intervalUpdater != nil { + // noop if interval is not changed + t.SetInterval(d.intervalUpdater()) + } + + return nil + } +} From 6dc00732b19a47143d36820c72502bd51e3cf0db Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 19 Dec 2024 14:01:48 +0100 Subject: [PATCH 03/11] Scheduler improvements & test coverage --- pkg/scheduler/opts.go | 45 ++++++ pkg/scheduler/scheduler.go | 182 +++++++++++----------- pkg/scheduler/scheduler_test.go | 262 ++++++++++++++++++++++++++++++++ 3 files changed, 404 insertions(+), 85 deletions(-) create mode 100644 pkg/scheduler/opts.go create mode 100644 pkg/scheduler/scheduler_test.go diff --git a/pkg/scheduler/opts.go b/pkg/scheduler/opts.go new file mode 100644 index 0000000000..56c1ec9d35 --- /dev/null +++ b/pkg/scheduler/opts.go @@ -0,0 +1,45 @@ +package scheduler + +import ( + "time" +) + +// Opt Definition option +type Opt func(*Definition) + +// Name sets task name. +func Name(name string) Opt { + return func(d *Definition) { d.name = name } +} + +// GroupName sets task group. Otherwise, defaults to DefaultGroup. +func GroupName(group Group) Opt { + return func(d *Definition) { d.group = group } +} + +// LogFields augments definition logger with some fields. +func LogFields(fields map[string]any) Opt { + return func(d *Definition) { d.logFields = fields } +} + +// Interval sets initial task interval. +func Interval(interval time.Duration) Opt { + return func(d *Definition) { d.interval = interval } +} + +// Skipper sets task skipper function +func Skipper(skipper func() bool) Opt { + return func(d *Definition) { d.skipper = skipper } +} + +// IntervalUpdater sets interval updater function. +func IntervalUpdater(intervalUpdater func() time.Duration) Opt { + return func(d *Definition) { d.intervalUpdater = intervalUpdater } +} + +// todo +// BlockListener alter behaviour to listen for new blocks instead of using a ticker. +// IntervalUpdater is ignored. +//func BlockListener(listener <-chan cometbft.EventDataNewBlock) Opt { +// return func(d *Definition) { d.blockListener = listener } +//} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index c2940ec3f3..9db57284ac 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -25,29 +25,39 @@ type Scheduler struct { logger zerolog.Logger } +// Task represents scheduler's task type Task func(ctx context.Context) error +// Group represents Definition group. +// Definitions can be grouped for easier management. type Group string +// DefaultGroup is the default group for definitions. const DefaultGroup = Group("default") +// Definition represents a configuration of a Task type Definition struct { + // ref to the Scheduler is required scheduler *Scheduler - id uuid.UUID - group Group - name string - task Task - ticker *ticker.Ticker + // naming stuff + id uuid.UUID + group Group + name string + // arbitrary function that will be invoked by the scheduler + task Task + + // properties for ticker / blockChan ticker + ticker *ticker.Ticker + blockChanTicker *blockChanTicker interval time.Duration intervalUpdater func() time.Duration skipper func() bool + // logging logFields map[string]any logger zerolog.Logger - - // todo block subscriber (on zeta-chain new block) } // New Scheduler instance. @@ -58,38 +68,6 @@ func New(logger zerolog.Logger) *Scheduler { } } -// Opt Definition option -type Opt func(*Definition) - -// Name sets task name. -func Name(name string) Opt { - return func(d *Definition) { d.name = name } -} - -func GroupName(group Group) Opt { - return func(d *Definition) { d.group = group } -} - -// LogFields augments definition logger with some fields. -func LogFields(fields map[string]any) Opt { - return func(d *Definition) { d.logFields = fields } -} - -// Interval sets initial task interval. -func Interval(interval time.Duration) Opt { - return func(d *Definition) { d.interval = interval } -} - -// Skipper sets task skipper function -func Skipper(skipper func() bool) Opt { - return func(d *Definition) { d.skipper = skipper } -} - -// IntervalUpdater sets interval updater function. -func IntervalUpdater(intervalUpdater func() time.Duration) Opt { - return func(d *Definition) { d.intervalUpdater = intervalUpdater } -} - // Register registers and starts new task in the background func (s *Scheduler) Register(ctx context.Context, task Task, opts ...Opt) *Definition { id := uuid.New() @@ -105,29 +83,14 @@ func (s *Scheduler) Register(ctx context.Context, task Task, opts ...Opt) *Defin opt(def) } - logOpts := s.logger.With(). - Str("task.name", def.name). - Str("task.group", string(def.group)) + def.logger = newDefinitionLogger(def, s.logger) - if len(def.logFields) > 0 { - logOpts = logOpts.Fields(def.logFields) - } - - def.logger = logOpts.Logger() - - defTicker := def.provisionTicker(task) - - bgTask := func(ctx context.Context) error { - return defTicker.Run(ctx) - } + def.startTicker(ctx) s.mu.Lock() s.definitions[id] = def s.mu.Unlock() - // Run async worker - bg.Work(ctx, bgTask, bg.WithLogger(def.logger), bg.WithName(def.name)) - return def } @@ -173,45 +136,55 @@ func (s *Scheduler) StopGroup(group Group) { // Stop stops the task and offloads it from the scheduler. func (d *Definition) Stop() { start := time.Now() - d.logger.Info().Msg("Stopping scheduler task") - d.ticker.StopBlocking() - d.logger.Info().Dur("time_taken", time.Since(start)).Msg("Stopped scheduler task") // delete definition from scheduler - d.scheduler.mu.Lock() - delete(d.scheduler.definitions, d.id) - d.scheduler.mu.Unlock() -} + defer func() { + d.scheduler.mu.Lock() + delete(d.scheduler.definitions, d.id) + d.scheduler.mu.Unlock() + d.logger.Info().Int64("time_taken_ms", time.Since(start).Milliseconds()).Msg("Stopped task") + }() -func (d *Definition) provisionTicker(task Task) *ticker.Ticker { - d.ticker = ticker.New( - d.interval, - d.tickerTask(task), - ticker.WithLogger(d.logger, d.name), - ) + d.logger.Info().Msg("Stopping scheduler task") + + if d.isTickerBasedTask() { + d.ticker.StopBlocking() + return + } - return d.ticker + // todo stop block chan ticker } -// tickerTask wraps Task to be executed by ticker.Ticker -func (d *Definition) tickerTask(task Task) ticker.Task { - // todo metrics - // - duration - // - outcome (skip, err, ok) - // - bump invocation counter +func (d *Definition) isTickerBasedTask() bool { + // todo + return true +} - return func(ctx context.Context, t *ticker.Ticker) error { - // skip tick - if d.skipper != nil && d.skipper() { - return nil +func (d *Definition) startTicker(ctx context.Context) { + if d.isTickerBasedTask() { + d.ticker = ticker.New( + d.interval, + d.tickerTask(), + ticker.WithLogger(d.logger, d.name), + ) + + bgTask := func(ctx context.Context) error { + d.logger.Info().Msg("Starting task") + return d.ticker.Run(ctx) } - err := task(ctx) + // Run async worker (no need for logger here) + bg.Work(ctx, bgTask) + return + } - if err != nil { - d.logger.Error().Err(err).Msg("task failed") - return nil - } + // todo start block chan ticker +} + +// tickerTask wraps Task to be executed by ticker.Ticker +func (d *Definition) tickerTask() ticker.Task { + return func(ctx context.Context, t *ticker.Ticker) error { + d.invoke(ctx) if d.intervalUpdater != nil { // noop if interval is not changed @@ -221,3 +194,42 @@ func (d *Definition) tickerTask(task Task) ticker.Task { return nil } } + +// invoke executes a given Task with logging & telemetry. +func (d *Definition) invoke(ctx context.Context) { + // skip tick + if d.skipper != nil && d.skipper() { + return + } + + d.logger.Debug().Msg("Invoking task") + // todo metrics + // - duration + // - outcome (skip, err, ok) + // - bump invocation counter + + err := d.task(ctx) + + if err != nil { + d.logger.Error().Err(err).Msg("task failed") + } +} + +func newDefinitionLogger(def *Definition, logger zerolog.Logger) zerolog.Logger { + logOpts := logger.With(). + Str("task.name", def.name). + Str("task.group", string(def.group)) + + if len(def.logFields) > 0 { + logOpts = logOpts.Fields(def.logFields) + } + + taskType := "interval_ticker" + if def.blockChanTicker != nil { + taskType = "block_ticker" + } + + logOpts.Str("task.type", taskType) + + return logOpts.Logger() +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go new file mode 100644 index 0000000000..71cf7ebdc3 --- /dev/null +++ b/pkg/scheduler/scheduler_test.go @@ -0,0 +1,262 @@ +package scheduler + +import ( + "bytes" + "context" + "fmt" + "io" + "sync/atomic" + "testing" + "time" + + "github.com/rs/zerolog" + "github.com/stretchr/testify/assert" +) + +func TestScheduler(t *testing.T) { + t.Run("Basic case", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + var counter int32 + + task := func(ctx context.Context) error { + atomic.AddInt32(&counter, 1) + return nil + } + + // ACT + // Register task and stop it after x1.5 interval. + ts.scheduler.Register(ts.ctx, task) + time.Sleep(1500 * time.Millisecond) + ts.scheduler.Stop() + + // ASSERT + // Counter should be 2 because we invoke a task once on a start, + // once after 1 second (default interval), + // and then at T=1.5s we stop the scheduler. + assert.Equal(t, int32(2), counter) + + // Check logs + assert.Contains(t, ts.logBuffer.String(), "Stopped task") + assert.Contains(t, ts.logBuffer.String(), `"task.group":"default"`) + }) + + t.Run("More opts", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + var counter int32 + + task := func(ctx context.Context) error { + atomic.AddInt32(&counter, 1) + return nil + } + + // ACT + // Register task and stop it after x1.5 interval. + ts.scheduler.Register( + ts.ctx, + task, + Name("counter-inc"), + GroupName("my-custom-group"), + Interval(300*time.Millisecond), + LogFields(map[string]any{ + "blockchain": "doge", + "validators": []string{"alice", "bob"}, + }), + ) + + time.Sleep(time.Second) + ts.scheduler.Stop() + + // ASSERT + // Counter should be 1 + 1000/300 = 4 (first run + interval runs) + assert.Equal(t, int32(4), counter) + + // Also check that log fields are present + assert.Contains(t, ts.logBuffer.String(), `"task.name":"counter-inc","task.group":"my-custom-group"`) + assert.Contains(t, ts.logBuffer.String(), `"blockchain":"doge","validators":["alice","bob"]`) + }) + + t.Run("Definition can also stop itself", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + var counter int32 + + task := func(ctx context.Context) error { + atomic.AddInt32(&counter, 1) + return nil + } + + // ACT + // Register task and stop it after x1.5 interval. + def := ts.scheduler.Register(ts.ctx, task, Interval(300*time.Millisecond)) + + time.Sleep(time.Second) + def.Stop() + + // ASSERT + // Counter should be 1 + 1000/300 = 4 (first run + interval runs) + assert.Equal(t, int32(4), counter) + }) + + t.Run("Skipper option", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + var counter int32 + + task := func(ctx context.Context) error { + atomic.AddInt32(&counter, 1) + return nil + } + + const maxValue = 5 + + // Skipper function that drops the task after reaching a certain counter value. + skipper := func() bool { + allowed := atomic.LoadInt32(&counter) < maxValue + return !allowed + } + + // ACT + // Register task and stop it after x1.5 interval. + def := ts.scheduler.Register(ts.ctx, task, Interval(50*time.Millisecond), Skipper(skipper)) + + time.Sleep(time.Second) + def.Stop() + + // ASSERT + assert.Equal(t, int32(maxValue), counter) + }) + + t.Run("IntervalUpdater option", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + var counter int32 + + task := func(ctx context.Context) error { + atomic.AddInt32(&counter, 1) + return nil + } + + // Interval updater that increases the interval by 50ms on each counter increment. + intervalUpdater := func() time.Duration { + return time.Duration(atomic.LoadInt32(&counter)) * 50 * time.Millisecond + } + + // ACT + // Register task and stop it after x1.5 interval. + def := ts.scheduler.Register(ts.ctx, task, Interval(time.Millisecond), IntervalUpdater(intervalUpdater)) + + time.Sleep(time.Second) + def.Stop() + + // ASSERT + assert.Equal(t, int32(6), counter) + + assert.Contains(t, ts.logBuffer.String(), `"ticker.old_interval":1,"ticker.new_interval":50`) + assert.Contains(t, ts.logBuffer.String(), `"ticker.old_interval":50,"ticker.new_interval":100`) + assert.Contains(t, ts.logBuffer.String(), `"ticker.old_interval":100,"ticker.new_interval":150`) + assert.Contains(t, ts.logBuffer.String(), `"ticker.old_interval":150,"ticker.new_interval":200`) + assert.Contains(t, ts.logBuffer.String(), `"ticker.old_interval":200,"ticker.new_interval":250`) + assert.Contains(t, ts.logBuffer.String(), `"ticker.old_interval":250,"ticker.new_interval":300`) + }) + + t.Run("Multiple tasks in different groups", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + // Given multiple tasks + var counterA, counterB, counterC int32 + + // Two tasks for Alice + taskAliceA := func(ctx context.Context) error { + atomic.AddInt32(&counterA, 1) + time.Sleep(60 * time.Millisecond) + return nil + } + + taskAliceB := func(ctx context.Context) error { + atomic.AddInt32(&counterB, 1) + time.Sleep(70 * time.Millisecond) + return nil + } + + // One task for Bob + taskBobC := func(ctx context.Context) error { + atomic.AddInt32(&counterC, 1) + time.Sleep(80 * time.Millisecond) + return nil + } + + // ACT + // Register all tasks with different intervals and groups + ts.scheduler.Register(ts.ctx, taskAliceA, Interval(50*time.Millisecond), GroupName("alice"), Name("a")) + ts.scheduler.Register(ts.ctx, taskAliceB, Interval(100*time.Millisecond), GroupName("alice"), Name("b")) + ts.scheduler.Register(ts.ctx, taskBobC, Interval(200*time.Millisecond), GroupName("bob"), Name("c")) + + // Wait and then stop Alice's tasks + time.Sleep(time.Second) + ts.scheduler.StopGroup("alice") + + // ASSERT #1 + shutdownLogPattern := func(group, name string) string { + return fmt.Sprintf( + `"task\.name":"%s","task\.group":"%s","time_taken_ms":.*"message":"Stopped task"`, + name, + group, + ) + } + + // Make sure Alice.A and Alice.B are stopped + assert.Regexp(t, shutdownLogPattern("alice", "a"), ts.logBuffer.String()) + assert.Regexp(t, shutdownLogPattern("alice", "b"), ts.logBuffer.String()) + + // But Bob.C is still running + assert.NotRegexp(t, shutdownLogPattern("bob", "c"), ts.logBuffer.String()) + + // ACT #2 + time.Sleep(200 * time.Millisecond) + ts.scheduler.StopGroup("bob") + + // ASSERT #2 + // Bob.C is not running + assert.Regexp(t, shutdownLogPattern("bob", "c"), ts.logBuffer.String()) + }) +} + +type testSuite struct { + ctx context.Context + scheduler *Scheduler + + logger zerolog.Logger + logBuffer *bytes.Buffer +} + +func newTestSuite(t *testing.T) *testSuite { + logBuffer := &bytes.Buffer{} + logger := zerolog.New(io.MultiWriter(zerolog.NewTestWriter(t), logBuffer)) + + return &testSuite{ + ctx: context.Background(), + scheduler: New(logger), + logger: logger, + logBuffer: logBuffer, + } +} From fb4309b9d88c0d4ec83eb77666cf414cdd15f66c Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 19 Dec 2024 15:55:26 +0100 Subject: [PATCH 04/11] Rename ticker.Run to ticker.Start for consistency --- pkg/ticker/ticker.go | 4 ++-- pkg/ticker/ticker_test.go | 16 ++++++++-------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/pkg/ticker/ticker.go b/pkg/ticker/ticker.go index 01125dee7c..9ec0d4cb06 100644 --- a/pkg/ticker/ticker.go +++ b/pkg/ticker/ticker.go @@ -95,7 +95,7 @@ func New(interval time.Duration, task Task, opts ...Opt) *Ticker { // Run creates and runs a new Ticker. func Run(ctx context.Context, interval time.Duration, task Task, opts ...Opt) error { - return New(interval, task, opts...).Run(ctx) + return New(interval, task, opts...).Start(ctx) } // Run runs the ticker by blocking current goroutine. It also invokes BEFORE ticker starts. @@ -103,7 +103,7 @@ func Run(ctx context.Context, interval time.Duration, task Task, opts ...Opt) er // - context is done (returns ctx.Err()) // - task returns an error or panics // - shutdown signal is received -func (t *Ticker) Run(ctx context.Context) (err error) { +func (t *Ticker) Start(ctx context.Context) (err error) { // prevent concurrent runs t.runnerMu.Lock() defer t.runnerMu.Unlock() diff --git a/pkg/ticker/ticker_test.go b/pkg/ticker/ticker_test.go index e7d0cf1ffe..276eb9457f 100644 --- a/pkg/ticker/ticker_test.go +++ b/pkg/ticker/ticker_test.go @@ -38,7 +38,7 @@ func TestTicker(t *testing.T) { }) // ACT - err := ticker.Run(ctx) + err := ticker.Start(ctx) // ASSERT assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -67,7 +67,7 @@ func TestTicker(t *testing.T) { }) // ACT - err := ticker.Run(ctx) + err := ticker.Start(ctx) // ASSERT assert.ErrorContains(t, err, "oops") @@ -100,7 +100,7 @@ func TestTicker(t *testing.T) { }) // ACT - err := ticker.Run(ctx) + err := ticker.Start(ctx) // ASSERT assert.ErrorIs(t, err, context.DeadlineExceeded) @@ -133,7 +133,7 @@ func TestTicker(t *testing.T) { }() // ACT - err := ticker.Run(ctx) + err := ticker.Start(ctx) // ASSERT assert.NoError(t, err) @@ -187,7 +187,7 @@ func TestTicker(t *testing.T) { // ACT // Imitate the ticker run in the background go func() { - err := ticker.Run(context.Background()) + err := ticker.Start(context.Background()) require.NoError(t, err) }() @@ -216,7 +216,7 @@ func TestTicker(t *testing.T) { // ACT go func() { - err := ticker.Run(context.Background()) + err := ticker.Start(context.Background()) require.NoError(t, err) }() @@ -244,7 +244,7 @@ func TestTicker(t *testing.T) { }) // ACT - err := ticker.Run(ctx) + err := ticker.Start(ctx) // ASSERT assert.ErrorContains(t, err, "panic during ticker run: oops") @@ -267,7 +267,7 @@ func TestTicker(t *testing.T) { }) // ACT - err := ticker.Run(ctx) + err := ticker.Start(ctx) // ASSERT assert.ErrorContains( From 7660cb5f046b007c7ac5a93b0ce634f29d9bd2ab Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:26:03 +0100 Subject: [PATCH 05/11] Implement block ticker --- pkg/scheduler/chan.go | 117 ++++++++++++++++++++++++ pkg/scheduler/opts.go | 11 +-- pkg/scheduler/scheduler.go | 87 +++++++++--------- pkg/scheduler/scheduler_test.go | 152 ++++++++++++++++++++++++++++++-- 4 files changed, 312 insertions(+), 55 deletions(-) create mode 100644 pkg/scheduler/chan.go diff --git a/pkg/scheduler/chan.go b/pkg/scheduler/chan.go new file mode 100644 index 0000000000..9c7f5744da --- /dev/null +++ b/pkg/scheduler/chan.go @@ -0,0 +1,117 @@ +package scheduler + +import ( + "context" + "fmt" + "sync/atomic" + + cometbft "github.com/cometbft/cometbft/types" + "github.com/rs/zerolog" +) + +// blockTicker represents custom ticker implementation +// that ticks on new Zeta block events. +type blockTicker struct { + task Task + + // block channel that will be used to receive new blocks + blockChan <-chan cometbft.EventDataNewBlock + + // stopChan is used to stop the ticker + stopChan chan struct{} + + // doneChan is used to signal that the ticker has stopped (i.e. "blocking stop") + doneChan chan struct{} + + // atomic flag. `1` for RUNNING, `0` for STOPPED + status int32 + + logger zerolog.Logger +} + +type blockCtxKey struct{} + +func newBlockTicker(task Task, blockChan <-chan cometbft.EventDataNewBlock, logger zerolog.Logger) *blockTicker { + return &blockTicker{ + task: task, + blockChan: blockChan, + stopChan: make(chan struct{}), + doneChan: nil, + logger: logger, + } +} + +func withBlockEvent(ctx context.Context, event cometbft.EventDataNewBlock) context.Context { + return context.WithValue(ctx, blockCtxKey{}, event) +} + +// BlockFromContext returns cometbft.EventDataNewBlock from the context or false. +func BlockFromContext(ctx context.Context) (cometbft.EventDataNewBlock, bool) { + blockEvent, ok := ctx.Value(blockCtxKey{}).(cometbft.EventDataNewBlock) + return blockEvent, ok +} + +func (t *blockTicker) Start(ctx context.Context) error { + if !t.setRunning(true) { + return fmt.Errorf("ticker already started") + } + + t.doneChan = make(chan struct{}) + defer func() { + close(t.doneChan) + + // closes stopChan if it's not closed yet + if t.setRunning(false) { + close(t.stopChan) + } + }() + + for { + select { + case block, ok := <-t.blockChan: + // channel closed + if !ok { + t.logger.Warn().Msg("Block channel closed") + return nil + } + + ctx := withBlockEvent(ctx, block) + + if err := t.task(ctx); err != nil { + t.logger.Warn().Err(err).Msg("Task error") + } + case <-ctx.Done(): + t.logger.Warn().Err(ctx.Err()).Msg("Content error") + return nil + case <-t.stopChan: + // caller invoked t.stop() + return nil + } + } +} + +func (t *blockTicker) Stop() { + // noop + if !t.getRunning() { + return + } + + // notify async loop to stop + close(t.stopChan) + + // wait for the loop to stop + <-t.doneChan + t.setRunning(false) +} + +func (t *blockTicker) getRunning() bool { + return atomic.LoadInt32(&t.status) == 1 +} + +func (t *blockTicker) setRunning(running bool) (changed bool) { + if running { + return atomic.CompareAndSwapInt32(&t.status, 0, 1) + } + + return atomic.CompareAndSwapInt32(&t.status, 1, 0) +} diff --git a/pkg/scheduler/opts.go b/pkg/scheduler/opts.go index 56c1ec9d35..8e1db669bf 100644 --- a/pkg/scheduler/opts.go +++ b/pkg/scheduler/opts.go @@ -2,6 +2,8 @@ package scheduler import ( "time" + + cometbft "github.com/cometbft/cometbft/types" ) // Opt Definition option @@ -37,9 +39,8 @@ func IntervalUpdater(intervalUpdater func() time.Duration) Opt { return func(d *Definition) { d.intervalUpdater = intervalUpdater } } -// todo -// BlockListener alter behaviour to listen for new blocks instead of using a ticker. +// BlockTicker makes Definition to listen for new zeta blocks instead of using interval ticker. // IntervalUpdater is ignored. -//func BlockListener(listener <-chan cometbft.EventDataNewBlock) Opt { -// return func(d *Definition) { d.blockListener = listener } -//} +func BlockTicker(blocks <-chan cometbft.EventDataNewBlock) Opt { + return func(d *Definition) { d.blockChan = blocks } +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 9db57284ac..3dc8581477 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -11,6 +11,7 @@ import ( "sync" "time" + cometbft "github.com/cometbft/cometbft/types" "github.com/google/uuid" "github.com/rs/zerolog" @@ -48,13 +49,16 @@ type Definition struct { // arbitrary function that will be invoked by the scheduler task Task - // properties for ticker / blockChan ticker + // represents interval ticker and its options ticker *ticker.Ticker - blockChanTicker *blockChanTicker interval time.Duration intervalUpdater func() time.Duration skipper func() bool + // zeta block ticker (also supports skipper) + blockChan <-chan cometbft.EventDataNewBlock + blockChanTicker *blockTicker + // logging logFields map[string]any logger zerolog.Logger @@ -142,77 +146,74 @@ func (d *Definition) Stop() { d.scheduler.mu.Lock() delete(d.scheduler.definitions, d.id) d.scheduler.mu.Unlock() - d.logger.Info().Int64("time_taken_ms", time.Since(start).Milliseconds()).Msg("Stopped task") + + timeTakenMS := time.Since(start).Milliseconds() + d.logger.Info().Int64("time_taken_ms", timeTakenMS).Msg("Stopped scheduler task") }() d.logger.Info().Msg("Stopping scheduler task") - if d.isTickerBasedTask() { + if d.isIntervalTicker() { d.ticker.StopBlocking() return } - // todo stop block chan ticker + d.blockChanTicker.Stop() } -func (d *Definition) isTickerBasedTask() bool { - // todo - return true +func (d *Definition) isIntervalTicker() bool { + return d.blockChan == nil } func (d *Definition) startTicker(ctx context.Context) { - if d.isTickerBasedTask() { - d.ticker = ticker.New( - d.interval, - d.tickerTask(), - ticker.WithLogger(d.logger, d.name), - ) - - bgTask := func(ctx context.Context) error { - d.logger.Info().Msg("Starting task") - return d.ticker.Run(ctx) - } + d.logger.Info().Msg("Starting scheduler task") + + if d.isIntervalTicker() { + d.ticker = ticker.New(d.interval, d.invokeByInterval, ticker.WithLogger(d.logger, d.name)) + bg.Work(ctx, d.ticker.Start, bg.WithLogger(d.logger)) - // Run async worker (no need for logger here) - bg.Work(ctx, bgTask) return } - // todo start block chan ticker -} + d.blockChanTicker = newBlockTicker(d.invoke, d.blockChan, d.logger) -// tickerTask wraps Task to be executed by ticker.Ticker -func (d *Definition) tickerTask() ticker.Task { - return func(ctx context.Context, t *ticker.Ticker) error { - d.invoke(ctx) + bg.Work(ctx, d.blockChanTicker.Start, bg.WithLogger(d.logger)) +} - if d.intervalUpdater != nil { - // noop if interval is not changed - t.SetInterval(d.intervalUpdater()) - } +// invokeByInterval a ticker.Task wrapper of invoke. +func (d *Definition) invokeByInterval(ctx context.Context, t *ticker.Ticker) error { + if err := d.invoke(ctx); err != nil { + d.logger.Error().Err(err).Msg("task failed") + } - return nil + if d.intervalUpdater != nil { + // noop if interval is not changed + t.SetInterval(d.intervalUpdater()) } + + return nil } // invoke executes a given Task with logging & telemetry. -func (d *Definition) invoke(ctx context.Context) { +func (d *Definition) invoke(ctx context.Context) error { // skip tick if d.skipper != nil && d.skipper() { - return + return nil } d.logger.Debug().Msg("Invoking task") - // todo metrics - // - duration - // - outcome (skip, err, ok) - // - bump invocation counter err := d.task(ctx) - if err != nil { - d.logger.Error().Err(err).Msg("task failed") - } + // todo metrics (TBD) + // - duration (time taken) + // - outcome (skip, err, ok) + // - bump invocation counter + // - "last invoked at" timestamp (?) + // - chain_id + // - metrics cardinality: "task_group (?)" "task_name", "status", "chain_id" + + return err } func newDefinitionLogger(def *Definition, logger zerolog.Logger) zerolog.Logger { @@ -229,7 +230,5 @@ func newDefinitionLogger(def *Definition, logger zerolog.Logger) zerolog.Logger taskType = "block_ticker" } - logOpts.Str("task.type", taskType) - - return logOpts.Logger() + return logOpts.Str("task.type", taskType).Logger() } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index 71cf7ebdc3..ea1bc98412 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -9,8 +9,10 @@ import ( "testing" "time" + cometbft "github.com/cometbft/cometbft/types" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestScheduler(t *testing.T) { @@ -40,7 +42,7 @@ func TestScheduler(t *testing.T) { assert.Equal(t, int32(2), counter) // Check logs - assert.Contains(t, ts.logBuffer.String(), "Stopped task") + assert.Contains(t, ts.logBuffer.String(), "Stopped scheduler task") assert.Contains(t, ts.logBuffer.String(), `"task.group":"default"`) }) @@ -217,11 +219,8 @@ func TestScheduler(t *testing.T) { // ASSERT #1 shutdownLogPattern := func(group, name string) string { - return fmt.Sprintf( - `"task\.name":"%s","task\.group":"%s","time_taken_ms":.*"message":"Stopped task"`, - name, - group, - ) + const pattern = `"task\.name":"%s","task\.group":"%s",.*"message":"Stopped scheduler task"` + return fmt.Sprintf(pattern, name, group) } // Make sure Alice.A and Alice.B are stopped @@ -239,6 +238,113 @@ func TestScheduler(t *testing.T) { // Bob.C is not running assert.Regexp(t, shutdownLogPattern("bob", "c"), ts.logBuffer.String()) }) + + t.Run("Block tick: tick is faster than the block", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + // Given a task that increments a counter by block height + var counter int64 + + task := func(ctx context.Context) error { + // Note that ctx contains the block event + blockEvent, ok := BlockFromContext(ctx) + require.True(t, ok) + + atomic.AddInt64(&counter, blockEvent.Block.Height) + time.Sleep(100 * time.Millisecond) + return nil + } + + // Given block ticker + blockChan := ts.mockBlockChan(200*time.Millisecond, 0) + + // ACT + // Register block + ts.scheduler.Register(ts.ctx, task, BlockTicker(blockChan)) + time.Sleep(1200 * time.Millisecond) + ts.scheduler.Stop() + + // ASSERT + assert.Equal(t, int64(21), counter) + assert.Contains(t, ts.logBuffer.String(), "Stopped scheduler task") + }) + + t.Run("Block tick: tick is slower than the block", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + // Given a task that increments a counter on start + // and then decrements before finish + var counter int64 + + task := func(ctx context.Context) error { + _, ok := BlockFromContext(ctx) + require.True(t, ok) + + atomic.AddInt64(&counter, 1) + time.Sleep(256 * time.Millisecond) + atomic.AddInt64(&counter, -1) + return nil + } + + // Given block ticker + blockChan := ts.mockBlockChan(100*time.Millisecond, 0) + + // ACT + // Register block + ts.scheduler.Register(ts.ctx, task, BlockTicker(blockChan)) + time.Sleep(1200 * time.Millisecond) + ts.scheduler.Stop() + + // ASSERT + // zero indicates that Stop() waits for current iteration to finish (graceful shutdown) + assert.Equal(t, int64(0), counter) + }) + + t.Run("Block tick: chan closes unexpectedly", func(t *testing.T) { + t.Parallel() + + // ARRANGE + ts := newTestSuite(t) + + // Given a task that increments a counter on start + // and then decrements before finish + var counter int64 + + task := func(ctx context.Context) error { + _, ok := BlockFromContext(ctx) + require.True(t, ok) + + atomic.AddInt64(&counter, 1) + time.Sleep(200 * time.Millisecond) + atomic.AddInt64(&counter, -1) + return nil + } + + // Given block ticker that closes after 3 blocks + blockChan := ts.mockBlockChan(100*time.Millisecond, 3) + + // ACT + // Register block + ts.scheduler.Register(ts.ctx, task, BlockTicker(blockChan), Name("block-tick")) + + // Wait for a while + time.Sleep(1000 * time.Millisecond) + + // Stop the scheduler. + // Note that actually the ticker is already stopped. + ts.scheduler.Stop() + + // ASSERT + // zero indicates that Stop() waits for current iteration to finish (graceful shutdown) + assert.Equal(t, int64(0), counter) + assert.Contains(t, ts.logBuffer.String(), "Block channel closed") + }) } type testSuite struct { @@ -260,3 +366,37 @@ func newTestSuite(t *testing.T) *testSuite { logBuffer: logBuffer, } } + +// mockBlockChan mocks websocket blocks. Optionally halts after lastBlock. +func (ts *testSuite) mockBlockChan(interval time.Duration, lastBlock int64) chan cometbft.EventDataNewBlock { + producer := make(chan cometbft.EventDataNewBlock) + + go func() { + var blockNumber int64 + + for { + blockNumber++ + ts.logger.Info().Int64("block_number", blockNumber).Msg("Producing new block") + + header := cometbft.Header{ + ChainID: "zeta", + Height: blockNumber, + Time: time.Now(), + } + + producer <- cometbft.EventDataNewBlock{ + Block: &cometbft.Block{Header: header}, + } + + if blockNumber > 0 && blockNumber == lastBlock { + ts.logger.Info().Int64("block_number", blockNumber).Msg("Halting block producer") + close(producer) + return + } + + time.Sleep(interval) + } + }() + + return producer +} From 5f0b2472b8d3b85b0154ac0730b2044675b671ac Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Thu, 19 Dec 2024 18:40:08 +0100 Subject: [PATCH 06/11] Update changelog --- changelog.md | 1 + 1 file changed, 1 insertion(+) diff --git a/changelog.md b/changelog.md index 66136741a3..1e471a41cf 100644 --- a/changelog.md +++ b/changelog.md @@ -17,6 +17,7 @@ * [3170](https://github.com/zeta-chain/node/pull/3170) - revamp TSS package in zetaclient * [3291](https://github.com/zeta-chain/node/pull/3291) - revamp zetaclient initialization (+ graceful shutdown) +* [3319](https://github.com/zeta-chain/node/pull/3319) - implement scheduler for zetaclient ### Fixes From 562cf71fe583e0d853f3fb1ffe5cb61afe9c3431 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:01:27 +0100 Subject: [PATCH 07/11] Rename Task to Executable. Rename Definition to Task --- pkg/scheduler/chan.go | 8 +- pkg/scheduler/opts.go | 20 ++--- pkg/scheduler/scheduler.go | 134 ++++++++++++++++---------------- pkg/scheduler/scheduler_test.go | 36 ++++----- 4 files changed, 98 insertions(+), 100 deletions(-) diff --git a/pkg/scheduler/chan.go b/pkg/scheduler/chan.go index 9c7f5744da..3d7e029f12 100644 --- a/pkg/scheduler/chan.go +++ b/pkg/scheduler/chan.go @@ -12,7 +12,7 @@ import ( // blockTicker represents custom ticker implementation // that ticks on new Zeta block events. type blockTicker struct { - task Task + exec Executable // block channel that will be used to receive new blocks blockChan <-chan cometbft.EventDataNewBlock @@ -31,9 +31,9 @@ type blockTicker struct { type blockCtxKey struct{} -func newBlockTicker(task Task, blockChan <-chan cometbft.EventDataNewBlock, logger zerolog.Logger) *blockTicker { +func newBlockTicker(task Executable, blockChan <-chan cometbft.EventDataNewBlock, logger zerolog.Logger) *blockTicker { return &blockTicker{ - task: task, + exec: task, blockChan: blockChan, stopChan: make(chan struct{}), doneChan: nil, @@ -77,7 +77,7 @@ func (t *blockTicker) Start(ctx context.Context) error { ctx := withBlockEvent(ctx, block) - if err := t.task(ctx); err != nil { + if err := t.exec(ctx); err != nil { t.logger.Warn().Err(err).Msg("Task error") } case <-ctx.Done(): diff --git a/pkg/scheduler/opts.go b/pkg/scheduler/opts.go index 8e1db669bf..1dc9bb872f 100644 --- a/pkg/scheduler/opts.go +++ b/pkg/scheduler/opts.go @@ -6,41 +6,41 @@ import ( cometbft "github.com/cometbft/cometbft/types" ) -// Opt Definition option -type Opt func(*Definition) +// Opt Task option +type Opt func(task *Task) // Name sets task name. func Name(name string) Opt { - return func(d *Definition) { d.name = name } + return func(d *Task) { d.name = name } } // GroupName sets task group. Otherwise, defaults to DefaultGroup. func GroupName(group Group) Opt { - return func(d *Definition) { d.group = group } + return func(d *Task) { d.group = group } } -// LogFields augments definition logger with some fields. +// LogFields augments Task's logger with some fields. func LogFields(fields map[string]any) Opt { - return func(d *Definition) { d.logFields = fields } + return func(d *Task) { d.logFields = fields } } // Interval sets initial task interval. func Interval(interval time.Duration) Opt { - return func(d *Definition) { d.interval = interval } + return func(d *Task) { d.interval = interval } } // Skipper sets task skipper function func Skipper(skipper func() bool) Opt { - return func(d *Definition) { d.skipper = skipper } + return func(d *Task) { d.skipper = skipper } } // IntervalUpdater sets interval updater function. func IntervalUpdater(intervalUpdater func() time.Duration) Opt { - return func(d *Definition) { d.intervalUpdater = intervalUpdater } + return func(d *Task) { d.intervalUpdater = intervalUpdater } } // BlockTicker makes Definition to listen for new zeta blocks instead of using interval ticker. // IntervalUpdater is ignored. func BlockTicker(blocks <-chan cometbft.EventDataNewBlock) Opt { - return func(d *Definition) { d.blockChan = blocks } + return func(d *Task) { d.blockChan = blocks } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 3dc8581477..01cf524ec1 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -21,23 +21,22 @@ import ( // Scheduler represents background task scheduler. type Scheduler struct { - definitions map[uuid.UUID]*Definition - mu sync.RWMutex - logger zerolog.Logger + tasks map[uuid.UUID]*Task + mu sync.RWMutex + logger zerolog.Logger } -// Task represents scheduler's task -type Task func(ctx context.Context) error +// Executable arbitrary function that can be executed. +type Executable func(ctx context.Context) error -// Group represents Definition group. -// Definitions can be grouped for easier management. +// Group represents Task group. Tasks can be grouped for easier management. type Group string -// DefaultGroup is the default group for definitions. +// DefaultGroup is the default task group. const DefaultGroup = Group("default") -// Definition represents a configuration of a Task -type Definition struct { +// Task represents scheduler's task. +type Task struct { // ref to the Scheduler is required scheduler *Scheduler @@ -46,8 +45,7 @@ type Definition struct { group Group name string - // arbitrary function that will be invoked by the scheduler - task Task + exec Executable // represents interval ticker and its options ticker *ticker.Ticker @@ -67,35 +65,35 @@ type Definition struct { // New Scheduler instance. func New(logger zerolog.Logger) *Scheduler { return &Scheduler{ - definitions: make(map[uuid.UUID]*Definition), - logger: logger.With().Str("module", "scheduler").Logger(), + tasks: make(map[uuid.UUID]*Task), + logger: logger.With().Str("module", "scheduler").Logger(), } } -// Register registers and starts new task in the background -func (s *Scheduler) Register(ctx context.Context, task Task, opts ...Opt) *Definition { +// Register registers and starts new Task in the background +func (s *Scheduler) Register(ctx context.Context, exec Executable, opts ...Opt) *Task { id := uuid.New() - def := &Definition{ + task := &Task{ scheduler: s, id: id, group: DefaultGroup, name: id.String(), - task: task, + exec: exec, interval: time.Second, } for _, opt := range opts { - opt(def) + opt(task) } - def.logger = newDefinitionLogger(def, s.logger) + task.logger = newTaskLogger(task, s.logger) - def.startTicker(ctx) + task.startTicker(ctx) s.mu.Lock() - s.definitions[id] = def + s.tasks[id] = task s.mu.Unlock() - return def + return task } // Stop stops all tasks. @@ -105,105 +103,105 @@ func (s *Scheduler) Stop() { // StopGroup stops all tasks in the group. func (s *Scheduler) StopGroup(group Group) { - var selectedDefs []*Definition + var selectedTasks []*Task s.mu.RLock() - // Filter desired definitions - for _, def := range s.definitions { + // Filter desired tasks + for _, task := range s.tasks { // "" is for wildcard i.e. all groups - if group == "" || def.group == group { - selectedDefs = append(selectedDefs, def) + if group == "" || task.group == group { + selectedTasks = append(selectedTasks, task) } } s.mu.RUnlock() - if len(selectedDefs) == 0 { + if len(selectedTasks) == 0 { return } // Stop all selected tasks concurrently var wg sync.WaitGroup - wg.Add(len(selectedDefs)) + wg.Add(len(selectedTasks)) - for _, def := range selectedDefs { - go func(def *Definition) { + for _, task := range selectedTasks { + go func(task *Task) { defer wg.Done() - def.Stop() - }(def) + task.Stop() + }(task) } wg.Wait() } // Stop stops the task and offloads it from the scheduler. -func (d *Definition) Stop() { +func (t *Task) Stop() { start := time.Now() - // delete definition from scheduler + // delete task from scheduler defer func() { - d.scheduler.mu.Lock() - delete(d.scheduler.definitions, d.id) - d.scheduler.mu.Unlock() + t.scheduler.mu.Lock() + delete(t.scheduler.tasks, t.id) + t.scheduler.mu.Unlock() timeTakenMS := time.Since(start).Milliseconds() - d.logger.Info().Int64("time_taken_ms", timeTakenMS).Msg("Stopped scheduler task") + t.logger.Info().Int64("time_taken_ms", timeTakenMS).Msg("Stopped scheduler task") }() - d.logger.Info().Msg("Stopping scheduler task") + t.logger.Info().Msg("Stopping scheduler task") - if d.isIntervalTicker() { - d.ticker.StopBlocking() + if t.isIntervalTicker() { + t.ticker.StopBlocking() return } - d.blockChanTicker.Stop() + t.blockChanTicker.Stop() } -func (d *Definition) isIntervalTicker() bool { - return d.blockChan == nil +func (t *Task) isIntervalTicker() bool { + return t.blockChan == nil } -func (d *Definition) startTicker(ctx context.Context) { - d.logger.Info().Msg("Starting scheduler task") +func (t *Task) startTicker(ctx context.Context) { + t.logger.Info().Msg("Starting scheduler task") - if d.isIntervalTicker() { - d.ticker = ticker.New(d.interval, d.invokeByInterval, ticker.WithLogger(d.logger, d.name)) - bg.Work(ctx, d.ticker.Start, bg.WithLogger(d.logger)) + if t.isIntervalTicker() { + t.ticker = ticker.New(t.interval, t.invokeByInterval, ticker.WithLogger(t.logger, t.name)) + bg.Work(ctx, t.ticker.Start, bg.WithLogger(t.logger)) return } - d.blockChanTicker = newBlockTicker(d.invoke, d.blockChan, d.logger) + t.blockChanTicker = newBlockTicker(t.invoke, t.blockChan, t.logger) - bg.Work(ctx, d.blockChanTicker.Start, bg.WithLogger(d.logger)) + bg.Work(ctx, t.blockChanTicker.Start, bg.WithLogger(t.logger)) } // invokeByInterval a ticker.Task wrapper of invoke. -func (d *Definition) invokeByInterval(ctx context.Context, t *ticker.Ticker) error { - if err := d.invoke(ctx); err != nil { - d.logger.Error().Err(err).Msg("task failed") +func (t *Task) invokeByInterval(ctx context.Context, tt *ticker.Ticker) error { + if err := t.invoke(ctx); err != nil { + t.logger.Error().Err(err).Msg("task failed") } - if d.intervalUpdater != nil { + if t.intervalUpdater != nil { // noop if interval is not changed - t.SetInterval(d.intervalUpdater()) + tt.SetInterval(t.intervalUpdater()) } return nil } // invoke executes a given Task with logging & telemetry. -func (d *Definition) invoke(ctx context.Context) error { +func (t *Task) invoke(ctx context.Context) error { // skip tick - if d.skipper != nil && d.skipper() { + if t.skipper != nil && t.skipper() { return nil } - d.logger.Debug().Msg("Invoking task") + t.logger.Debug().Msg("Invoking task") - err := d.task(ctx) + err := t.exec(ctx) // todo metrics (TBD) // - duration (time taken) @@ -216,17 +214,17 @@ func (d *Definition) invoke(ctx context.Context) error { return err } -func newDefinitionLogger(def *Definition, logger zerolog.Logger) zerolog.Logger { +func newTaskLogger(task *Task, logger zerolog.Logger) zerolog.Logger { logOpts := logger.With(). - Str("task.name", def.name). - Str("task.group", string(def.group)) + Str("task.name", task.name). + Str("task.group", string(task.group)) - if len(def.logFields) > 0 { - logOpts = logOpts.Fields(def.logFields) + if len(task.logFields) > 0 { + logOpts = logOpts.Fields(task.logFields) } taskType := "interval_ticker" - if def.blockChanTicker != nil { + if task.blockChanTicker != nil { taskType = "block_ticker" } diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index ea1bc98412..cc50b04c25 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -24,14 +24,14 @@ func TestScheduler(t *testing.T) { var counter int32 - task := func(ctx context.Context) error { + exec := func(ctx context.Context) error { atomic.AddInt32(&counter, 1) return nil } // ACT // Register task and stop it after x1.5 interval. - ts.scheduler.Register(ts.ctx, task) + ts.scheduler.Register(ts.ctx, exec) time.Sleep(1500 * time.Millisecond) ts.scheduler.Stop() @@ -54,7 +54,7 @@ func TestScheduler(t *testing.T) { var counter int32 - task := func(ctx context.Context) error { + exec := func(ctx context.Context) error { atomic.AddInt32(&counter, 1) return nil } @@ -63,7 +63,7 @@ func TestScheduler(t *testing.T) { // Register task and stop it after x1.5 interval. ts.scheduler.Register( ts.ctx, - task, + exec, Name("counter-inc"), GroupName("my-custom-group"), Interval(300*time.Millisecond), @@ -85,7 +85,7 @@ func TestScheduler(t *testing.T) { assert.Contains(t, ts.logBuffer.String(), `"blockchain":"doge","validators":["alice","bob"]`) }) - t.Run("Definition can also stop itself", func(t *testing.T) { + t.Run("Task can stop itself", func(t *testing.T) { t.Parallel() // ARRANGE @@ -93,17 +93,17 @@ func TestScheduler(t *testing.T) { var counter int32 - task := func(ctx context.Context) error { + exec := func(ctx context.Context) error { atomic.AddInt32(&counter, 1) return nil } // ACT // Register task and stop it after x1.5 interval. - def := ts.scheduler.Register(ts.ctx, task, Interval(300*time.Millisecond)) + task := ts.scheduler.Register(ts.ctx, exec, Interval(300*time.Millisecond)) time.Sleep(time.Second) - def.Stop() + task.Stop() // ASSERT // Counter should be 1 + 1000/300 = 4 (first run + interval runs) @@ -118,7 +118,7 @@ func TestScheduler(t *testing.T) { var counter int32 - task := func(ctx context.Context) error { + exec := func(ctx context.Context) error { atomic.AddInt32(&counter, 1) return nil } @@ -133,10 +133,10 @@ func TestScheduler(t *testing.T) { // ACT // Register task and stop it after x1.5 interval. - def := ts.scheduler.Register(ts.ctx, task, Interval(50*time.Millisecond), Skipper(skipper)) + task := ts.scheduler.Register(ts.ctx, exec, Interval(50*time.Millisecond), Skipper(skipper)) time.Sleep(time.Second) - def.Stop() + task.Stop() // ASSERT assert.Equal(t, int32(maxValue), counter) @@ -150,7 +150,7 @@ func TestScheduler(t *testing.T) { var counter int32 - task := func(ctx context.Context) error { + exec := func(ctx context.Context) error { atomic.AddInt32(&counter, 1) return nil } @@ -162,10 +162,10 @@ func TestScheduler(t *testing.T) { // ACT // Register task and stop it after x1.5 interval. - def := ts.scheduler.Register(ts.ctx, task, Interval(time.Millisecond), IntervalUpdater(intervalUpdater)) + task := ts.scheduler.Register(ts.ctx, exec, Interval(time.Millisecond), IntervalUpdater(intervalUpdater)) time.Sleep(time.Second) - def.Stop() + task.Stop() // ASSERT assert.Equal(t, int32(6), counter) @@ -282,7 +282,7 @@ func TestScheduler(t *testing.T) { // and then decrements before finish var counter int64 - task := func(ctx context.Context) error { + exec := func(ctx context.Context) error { _, ok := BlockFromContext(ctx) require.True(t, ok) @@ -297,7 +297,7 @@ func TestScheduler(t *testing.T) { // ACT // Register block - ts.scheduler.Register(ts.ctx, task, BlockTicker(blockChan)) + ts.scheduler.Register(ts.ctx, exec, BlockTicker(blockChan)) time.Sleep(1200 * time.Millisecond) ts.scheduler.Stop() @@ -316,7 +316,7 @@ func TestScheduler(t *testing.T) { // and then decrements before finish var counter int64 - task := func(ctx context.Context) error { + exec := func(ctx context.Context) error { _, ok := BlockFromContext(ctx) require.True(t, ok) @@ -331,7 +331,7 @@ func TestScheduler(t *testing.T) { // ACT // Register block - ts.scheduler.Register(ts.ctx, task, BlockTicker(blockChan), Name("block-tick")) + ts.scheduler.Register(ts.ctx, exec, BlockTicker(blockChan), Name("block-tick")) // Wait for a while time.Sleep(1000 * time.Millisecond) From cf53e4249f1f1ede76db74d9212a9fc0871aae22 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Fri, 20 Dec 2024 15:11:03 +0100 Subject: [PATCH 08/11] Use atomic.Bool --- pkg/scheduler/chan.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/pkg/scheduler/chan.go b/pkg/scheduler/chan.go index 3d7e029f12..f77eb6028a 100644 --- a/pkg/scheduler/chan.go +++ b/pkg/scheduler/chan.go @@ -9,8 +9,8 @@ import ( "github.com/rs/zerolog" ) -// blockTicker represents custom ticker implementation -// that ticks on new Zeta block events. +// blockTicker represents custom ticker implementation that ticks on new Zeta block events. +// Pass blockTicker ONLY by pointer. type blockTicker struct { exec Executable @@ -23,8 +23,7 @@ type blockTicker struct { // doneChan is used to signal that the ticker has stopped (i.e. "blocking stop") doneChan chan struct{} - // atomic flag. `1` for RUNNING, `0` for STOPPED - status int32 + isRunning atomic.Bool logger zerolog.Logger } @@ -92,7 +91,7 @@ func (t *blockTicker) Start(ctx context.Context) error { func (t *blockTicker) Stop() { // noop - if !t.getRunning() { + if !t.isRunning.Load() { return } @@ -104,14 +103,10 @@ func (t *blockTicker) Stop() { t.setRunning(false) } -func (t *blockTicker) getRunning() bool { - return atomic.LoadInt32(&t.status) == 1 -} - func (t *blockTicker) setRunning(running bool) (changed bool) { if running { - return atomic.CompareAndSwapInt32(&t.status, 0, 1) + return t.isRunning.CompareAndSwap(false, true) } - return atomic.CompareAndSwapInt32(&t.status, 1, 0) + return t.isRunning.CompareAndSwap(true, false) } From 2d0241ef620c331d6d2cce53cd512964aceaa577 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Fri, 20 Dec 2024 17:39:16 +0100 Subject: [PATCH 09/11] Fix blockTicker concurrency issues. Add intervalTicker --- pkg/scheduler/{chan.go => tickers.go} | 102 ++++++++++++++++++++------ 1 file changed, 81 insertions(+), 21 deletions(-) rename pkg/scheduler/{chan.go => tickers.go} (55%) diff --git a/pkg/scheduler/chan.go b/pkg/scheduler/tickers.go similarity index 55% rename from pkg/scheduler/chan.go rename to pkg/scheduler/tickers.go index f77eb6028a..613194c44b 100644 --- a/pkg/scheduler/chan.go +++ b/pkg/scheduler/tickers.go @@ -3,12 +3,53 @@ package scheduler import ( "context" "fmt" - "sync/atomic" + "sync" + "time" cometbft "github.com/cometbft/cometbft/types" "github.com/rs/zerolog" + + "github.com/zeta-chain/node/pkg/ticker" ) +// intervalTicker wrapper for ticker.Ticker. +type intervalTicker struct { + ticker *ticker.Ticker +} + +func newIntervalTicker( + task Executable, + interval time.Duration, + intervalUpdater func() time.Duration, + taskName string, + logger zerolog.Logger, +) *intervalTicker { + wrapper := func(ctx context.Context, t *ticker.Ticker) error { + if err := task(ctx); err != nil { + logger.Error().Err(err).Msg("task failed") + } + + if intervalUpdater != nil { + // noop if interval is not changed + t.SetInterval(intervalUpdater()) + } + + return nil + } + + tt := ticker.New(interval, wrapper, ticker.WithLogger(logger, taskName)) + + return &intervalTicker{ticker: tt} +} + +func (t *intervalTicker) Start(ctx context.Context) error { + return t.ticker.Start(ctx) +} + +func (t *intervalTicker) Stop() { + t.ticker.StopBlocking() +} + // blockTicker represents custom ticker implementation that ticks on new Zeta block events. // Pass blockTicker ONLY by pointer. type blockTicker struct { @@ -23,7 +64,8 @@ type blockTicker struct { // doneChan is used to signal that the ticker has stopped (i.e. "blocking stop") doneChan chan struct{} - isRunning atomic.Bool + isRunning bool + mu sync.Mutex logger zerolog.Logger } @@ -34,8 +76,6 @@ func newBlockTicker(task Executable, blockChan <-chan cometbft.EventDataNewBlock return &blockTicker{ exec: task, blockChan: blockChan, - stopChan: make(chan struct{}), - doneChan: nil, logger: logger, } } @@ -51,19 +91,14 @@ func BlockFromContext(ctx context.Context) (cometbft.EventDataNewBlock, bool) { } func (t *blockTicker) Start(ctx context.Context) error { - if !t.setRunning(true) { - return fmt.Errorf("ticker already started") + if err := t.init(); err != nil { + return err } - t.doneChan = make(chan struct{}) - defer func() { - close(t.doneChan) + defer t.cleanup() - // closes stopChan if it's not closed yet - if t.setRunning(false) { - close(t.stopChan) - } - }() + // release Stop() blocking + defer func() { close(t.doneChan) }() for { select { @@ -77,7 +112,7 @@ func (t *blockTicker) Start(ctx context.Context) error { ctx := withBlockEvent(ctx, block) if err := t.exec(ctx); err != nil { - t.logger.Warn().Err(err).Msg("Task error") + t.logger.Error().Err(err).Msg("Task error") } case <-ctx.Done(): t.logger.Warn().Err(ctx.Err()).Msg("Content error") @@ -90,8 +125,11 @@ func (t *blockTicker) Start(ctx context.Context) error { } func (t *blockTicker) Stop() { + t.mu.Lock() + defer t.mu.Unlock() + // noop - if !t.isRunning.Load() { + if !t.isRunning { return } @@ -100,13 +138,35 @@ func (t *blockTicker) Stop() { // wait for the loop to stop <-t.doneChan - t.setRunning(false) + + t.isRunning = false +} + +func (t *blockTicker) init() error { + t.mu.Lock() + defer t.mu.Unlock() + + if t.isRunning { + return fmt.Errorf("ticker already started") + } + + t.stopChan = make(chan struct{}) + t.doneChan = make(chan struct{}) + t.isRunning = true + + return nil } -func (t *blockTicker) setRunning(running bool) (changed bool) { - if running { - return t.isRunning.CompareAndSwap(false, true) +// if ticker was stopped NOT by Stop() method, we want to make a cleanup +func (t *blockTicker) cleanup() { + t.mu.Lock() + defer t.mu.Unlock() + + // noop + if !t.isRunning { + return } - return t.isRunning.CompareAndSwap(true, false) + t.isRunning = false + close(t.stopChan) } From 60c166dbfb929c1f23863983554c9b670d22cf0e Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Fri, 20 Dec 2024 17:40:21 +0100 Subject: [PATCH 10/11] Simplify Task. Add support for different tickers --- pkg/scheduler/opts.go | 20 +++--- pkg/scheduler/scheduler.go | 122 ++++++++++++++------------------ pkg/scheduler/scheduler_test.go | 1 + 3 files changed, 64 insertions(+), 79 deletions(-) diff --git a/pkg/scheduler/opts.go b/pkg/scheduler/opts.go index 1dc9bb872f..8e5d54e370 100644 --- a/pkg/scheduler/opts.go +++ b/pkg/scheduler/opts.go @@ -7,40 +7,40 @@ import ( ) // Opt Task option -type Opt func(task *Task) +type Opt func(task *Task, taskOpts *taskOpts) // Name sets task name. func Name(name string) Opt { - return func(d *Task) { d.name = name } + return func(t *Task, _ *taskOpts) { t.name = name } } // GroupName sets task group. Otherwise, defaults to DefaultGroup. func GroupName(group Group) Opt { - return func(d *Task) { d.group = group } + return func(t *Task, _ *taskOpts) { t.group = group } } // LogFields augments Task's logger with some fields. func LogFields(fields map[string]any) Opt { - return func(d *Task) { d.logFields = fields } + return func(_ *Task, opts *taskOpts) { opts.logFields = fields } } // Interval sets initial task interval. func Interval(interval time.Duration) Opt { - return func(d *Task) { d.interval = interval } + return func(_ *Task, opts *taskOpts) { opts.interval = interval } } // Skipper sets task skipper function func Skipper(skipper func() bool) Opt { - return func(d *Task) { d.skipper = skipper } + return func(t *Task, _ *taskOpts) { t.skipper = skipper } } // IntervalUpdater sets interval updater function. func IntervalUpdater(intervalUpdater func() time.Duration) Opt { - return func(d *Task) { d.intervalUpdater = intervalUpdater } + return func(_ *Task, opts *taskOpts) { opts.intervalUpdater = intervalUpdater } } -// BlockTicker makes Definition to listen for new zeta blocks instead of using interval ticker. -// IntervalUpdater is ignored. +// BlockTicker makes Task to listen for new zeta blocks +// instead of using interval ticker. IntervalUpdater is ignored. func BlockTicker(blocks <-chan cometbft.EventDataNewBlock) Opt { - return func(d *Task) { d.blockChan = blocks } + return func(_ *Task, opts *taskOpts) { opts.blockChan = blocks } } diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index 01cf524ec1..d0f931d7a8 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -16,7 +16,6 @@ import ( "github.com/rs/zerolog" "github.com/zeta-chain/node/pkg/bg" - "github.com/zeta-chain/node/pkg/ticker" ) // Scheduler represents background task scheduler. @@ -35,31 +34,37 @@ type Group string // DefaultGroup is the default task group. const DefaultGroup = Group("default") +// tickable ticker abstraction to support different implementations +type tickable interface { + Start(ctx context.Context) error + Stop() +} + // Task represents scheduler's task. type Task struct { // ref to the Scheduler is required scheduler *Scheduler - // naming stuff id uuid.UUID group Group name string exec Executable - // represents interval ticker and its options - ticker *ticker.Ticker + // ticker abstraction to support different implementations + ticker tickable + skipper func() bool + + logger zerolog.Logger +} + +type taskOpts struct { interval time.Duration intervalUpdater func() time.Duration - skipper func() bool - // zeta block ticker (also supports skipper) - blockChan <-chan cometbft.EventDataNewBlock - blockChanTicker *blockTicker + blockChan <-chan cometbft.EventDataNewBlock - // logging logFields map[string]any - logger zerolog.Logger } // New Scheduler instance. @@ -79,15 +84,21 @@ func (s *Scheduler) Register(ctx context.Context, exec Executable, opts ...Opt) group: DefaultGroup, name: id.String(), exec: exec, - interval: time.Second, } + + config := &taskOpts{ + interval: time.Second, + } + for _, opt := range opts { - opt(task) + opt(task, config) } - task.logger = newTaskLogger(task, s.logger) + task.logger = newTaskLogger(task, config, s.logger) + task.ticker = newTickable(task, config) - task.startTicker(ctx) + task.logger.Info().Msg("Starting scheduler task") + bg.Work(ctx, task.ticker.Start, bg.WithLogger(task.logger)) s.mu.Lock() s.tasks[id] = task @@ -137,63 +148,21 @@ func (s *Scheduler) StopGroup(group Group) { // Stop stops the task and offloads it from the scheduler. func (t *Task) Stop() { - start := time.Now() - - // delete task from scheduler - defer func() { - t.scheduler.mu.Lock() - delete(t.scheduler.tasks, t.id) - t.scheduler.mu.Unlock() - - timeTakenMS := time.Since(start).Milliseconds() - t.logger.Info().Int64("time_taken_ms", timeTakenMS).Msg("Stopped scheduler task") - }() - t.logger.Info().Msg("Stopping scheduler task") + start := time.Now() - if t.isIntervalTicker() { - t.ticker.StopBlocking() - return - } - - t.blockChanTicker.Stop() -} - -func (t *Task) isIntervalTicker() bool { - return t.blockChan == nil -} - -func (t *Task) startTicker(ctx context.Context) { - t.logger.Info().Msg("Starting scheduler task") - - if t.isIntervalTicker() { - t.ticker = ticker.New(t.interval, t.invokeByInterval, ticker.WithLogger(t.logger, t.name)) - bg.Work(ctx, t.ticker.Start, bg.WithLogger(t.logger)) - - return - } - - t.blockChanTicker = newBlockTicker(t.invoke, t.blockChan, t.logger) - - bg.Work(ctx, t.blockChanTicker.Start, bg.WithLogger(t.logger)) -} - -// invokeByInterval a ticker.Task wrapper of invoke. -func (t *Task) invokeByInterval(ctx context.Context, tt *ticker.Ticker) error { - if err := t.invoke(ctx); err != nil { - t.logger.Error().Err(err).Msg("task failed") - } + t.ticker.Stop() - if t.intervalUpdater != nil { - // noop if interval is not changed - tt.SetInterval(t.intervalUpdater()) - } + t.scheduler.mu.Lock() + delete(t.scheduler.tasks, t.id) + t.scheduler.mu.Unlock() - return nil + timeTakenMS := time.Since(start).Milliseconds() + t.logger.Info().Int64("time_taken_ms", timeTakenMS).Msg("Stopped scheduler task") } -// invoke executes a given Task with logging & telemetry. -func (t *Task) invoke(ctx context.Context) error { +// execute executes Task with additional logging and metrics. +func (t *Task) execute(ctx context.Context) error { // skip tick if t.skipper != nil && t.skipper() { return nil @@ -214,19 +183,34 @@ func (t *Task) invoke(ctx context.Context) error { return err } -func newTaskLogger(task *Task, logger zerolog.Logger) zerolog.Logger { +func newTaskLogger(task *Task, opts *taskOpts, logger zerolog.Logger) zerolog.Logger { logOpts := logger.With(). Str("task.name", task.name). Str("task.group", string(task.group)) - if len(task.logFields) > 0 { - logOpts = logOpts.Fields(task.logFields) + if len(opts.logFields) > 0 { + logOpts = logOpts.Fields(opts.logFields) } taskType := "interval_ticker" - if task.blockChanTicker != nil { + if opts.blockChan != nil { taskType = "block_ticker" } return logOpts.Str("task.type", taskType).Logger() } + +func newTickable(task *Task, opts *taskOpts) tickable { + // Block-based ticker + if opts.blockChan != nil { + return newBlockTicker(task.execute, opts.blockChan, task.logger) + } + + return newIntervalTicker( + task.execute, + opts.interval, + opts.intervalUpdater, + task.name, + task.logger, + ) +} diff --git a/pkg/scheduler/scheduler_test.go b/pkg/scheduler/scheduler_test.go index cc50b04c25..a993bc875a 100644 --- a/pkg/scheduler/scheduler_test.go +++ b/pkg/scheduler/scheduler_test.go @@ -270,6 +270,7 @@ func TestScheduler(t *testing.T) { // ASSERT assert.Equal(t, int64(21), counter) assert.Contains(t, ts.logBuffer.String(), "Stopped scheduler task") + assert.Contains(t, ts.logBuffer.String(), `"task.type":"block_ticker"`) }) t.Run("Block tick: tick is slower than the block", func(t *testing.T) { From 6cc8e435b0b677fe004c50d97f6480d9aaf85326 Mon Sep 17 00:00:00 2001 From: Dmitry S <11892559+swift1337@users.noreply.github.com> Date: Fri, 20 Dec 2024 18:29:41 +0100 Subject: [PATCH 11/11] Add metrics --- pkg/scheduler/metrics.go | 29 +++++++++++++++++++++++++++++ pkg/scheduler/scheduler.go | 13 ++++--------- zetaclient/metrics/metrics.go | 21 +++++++++++++++++++++ 3 files changed, 54 insertions(+), 9 deletions(-) create mode 100644 pkg/scheduler/metrics.go diff --git a/pkg/scheduler/metrics.go b/pkg/scheduler/metrics.go new file mode 100644 index 0000000000..96d581ebfb --- /dev/null +++ b/pkg/scheduler/metrics.go @@ -0,0 +1,29 @@ +package scheduler + +import ( + "time" + + "github.com/zeta-chain/node/zetaclient/metrics" +) + +// Note that currently the hard-coded "global" metrics are used. +func recordMetrics(task *Task, startedAt time.Time, err error, skipped bool) { + var status string + switch { + case skipped: + status = "skipped" + case err != nil: + status = "failed" + default: + status = "ok" + } + + var ( + group = string(task.group) + name = task.name + dur = time.Since(startedAt).Seconds() + ) + + metrics.SchedulerTaskInvocationCounter.WithLabelValues(status, group, name).Inc() + metrics.SchedulerTaskExecutionDuration.WithLabelValues(status, group, name).Observe(dur) +} diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index d0f931d7a8..2328cbddd7 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -163,22 +163,17 @@ func (t *Task) Stop() { // execute executes Task with additional logging and metrics. func (t *Task) execute(ctx context.Context) error { + startedAt := time.Now().UTC() + // skip tick if t.skipper != nil && t.skipper() { + recordMetrics(t, startedAt, nil, true) return nil } - t.logger.Debug().Msg("Invoking task") - err := t.exec(ctx) - // todo metrics (TBD) - // - duration (time taken) - // - outcome (skip, err, ok) - // - bump invocation counter - // - "last invoked at" timestamp (?) - // - chain_id - // - metrics cardinality: "task_group (?)" "task_name", "status", "chain_id" + recordMetrics(t, startedAt, err, false) return err } diff --git a/zetaclient/metrics/metrics.go b/zetaclient/metrics/metrics.go index 28fe897504..36dc5ad813 100644 --- a/zetaclient/metrics/metrics.go +++ b/zetaclient/metrics/metrics.go @@ -170,6 +170,27 @@ var ( Name: "num_connected_peers", Help: "The number of connected peers (authenticated keygen peers)", }) + + // SchedulerTaskInvocationCounter tracks invocations categorized by status, group, and name + SchedulerTaskInvocationCounter = promauto.NewCounterVec( + prometheus.CounterOpts{ + Namespace: ZetaClientNamespace, + Name: "scheduler_task_invocations_total", + Help: "Total number of task invocations", + }, + []string{"status", "task_group", "task_name"}, + ) + + // SchedulerTaskExecutionDuration measures the execution duration of tasks + SchedulerTaskExecutionDuration = promauto.NewHistogramVec( + prometheus.HistogramOpts{ + Namespace: ZetaClientNamespace, + Name: "scheduler_task_duration_seconds", + Help: "Histogram of task execution duration in seconds", + Buckets: []float64{0.05, 0.1, 0.2, 0.3, 0.5, 1, 1.5, 2, 3, 5, 7.5, 10, 15}, // 50ms to 15s + }, + []string{"status", "task_group", "task_name"}, + ) ) // NewMetrics creates a new Metrics instance