Skip to content

Commit

Permalink
pkg/utils: add NewSleeperTaskCtx(WorkerCtx)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmank88 committed Oct 23, 2024
1 parent c5c856e commit 2cac614
Showing 1 changed file with 27 additions and 6 deletions.
33 changes: 27 additions & 6 deletions pkg/utils/sleeper_task.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package utils

import (
"context"
"fmt"
"time"

Expand All @@ -13,12 +14,18 @@ type Worker interface {
Name() string
}

// WorkerCtx is like Worker but includes [context.Context].
type WorkerCtx interface {
Work(ctx context.Context)
Name() string
}

// SleeperTask represents a task that waits in the background to process some work.
type SleeperTask struct {
services.StateMachine
worker Worker
worker WorkerCtx
chQueue chan struct{}
chStop chan struct{}
chStop services.StopChan
chDone chan struct{}
chWorkDone chan struct{}
}
Expand All @@ -31,16 +38,27 @@ type SleeperTask struct {
// immediately after it is finished. For this reason you should take care to
// make sure that Worker is idempotent.
// WakeUp does not block.
func NewSleeperTask(worker Worker) *SleeperTask {
func NewSleeperTask(w Worker) *SleeperTask {
return NewSleeperTaskCtx(&worker{w})
}

type worker struct {
Worker
}

func (w *worker) Work(ctx context.Context) { w.Worker.Work() }

// NewSleeperTaskCtx is like NewSleeperTask but accepts a WorkerCtx with a [context.Context].
func NewSleeperTaskCtx(w WorkerCtx) *SleeperTask {
s := &SleeperTask{
worker: worker,
worker: w,
chQueue: make(chan struct{}, 1),
chStop: make(chan struct{}),
chDone: make(chan struct{}),
chWorkDone: make(chan struct{}, 10),
}

_ = s.StartOnce("SleeperTask-"+worker.Name(), func() error {
_ = s.StartOnce("SleeperTask-"+w.Name(), func() error {
go s.workerLoop()
return nil
})
Expand Down Expand Up @@ -98,10 +116,13 @@ func (s *SleeperTask) WorkDone() <-chan struct{} {
func (s *SleeperTask) workerLoop() {
defer close(s.chDone)

ctx, cancel := s.chStop.NewCtx()
defer cancel()

for {
select {
case <-s.chQueue:
s.worker.Work()
s.worker.Work(ctx)
s.workDone()
case <-s.chStop:
return
Expand Down

0 comments on commit 2cac614

Please sign in to comment.