-
Notifications
You must be signed in to change notification settings - Fork 0
/
job_queue.go
58 lines (47 loc) · 988 Bytes
/
job_queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package task
import (
"context"
"github.com/difof/goul/concurrency"
)
type JobHandler func(ctx context.Context, arg any) error
type Job struct {
handler JobHandler
arg any
}
type JobQueue struct {
jobs chan Job
ctx concurrency.CancelContext
lasterr error
closer chan struct{}
}
func NewJobQueue(size int) *JobQueue {
q := &JobQueue{
jobs: make(chan Job, size),
ctx: concurrency.NewCancelContext(context.Background()),
closer: make(chan struct{}),
}
go q.runner()
return q
}
// runner
func (q *JobQueue) runner() {
for {
select {
case <-q.ctx.Done():
q.closer <- struct{}{}
return
case job := <-q.jobs:
q.lasterr = job.handler(q.ctx, job.arg)
}
}
}
// Close closes the queue and waits for all jobs to finish
func (q *JobQueue) Close() error {
q.ctx.Cancel()
<-q.closer
return q.lasterr
}
// Queue adds a new job to the queue
func (q *JobQueue) Queue(job JobHandler, arg ...any) {
q.jobs <- Job{handler: job, arg: arg}
}