Skip to content

Commit

Permalink
Add non-blocking call
Browse files Browse the repository at this point in the history
  • Loading branch information
jsvensson committed Nov 19, 2017
1 parent ea058a8 commit 22dacc6
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 17 deletions.
34 changes: 30 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@
[![Go Report Card](https://goreportcard.com/badge/github.com/jsvensson/minion)](https://goreportcard.com/report/github.com/jsvensson/minion)
[![codecov](https://codecov.io/gh/jsvensson/minion/branch/master/graph/badge.svg)](https://codecov.io/gh/jsvensson/minion)

**Minion** 🍌 is a worker/dispatcher package for distributing jobs across a number of workers. The dispatcher creates the workers and returns a buffered channel of a given length that is used as the job queue. The incoming jobs are distributed to the available workers.
**Minion** 🍌 is a worker/dispatcher package for distributing jobs across a number of workers. The dispatcher creates the workers and is used to enqueue jobs, using either a blocking or non-blocking function. The incoming jobs are distributed to the available workers.

This package is strongly inspired by Marcio Castilho's blog post, [Handling 1 Million Requests per Minute with Go](http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/).

## Creating jobs

A job is created by implementing the `Job` interface:

Expand All @@ -18,7 +22,29 @@ type Job interface {

The implementing struct can contain whatever additional fields it needs to perform the job, as seen in the example below.

This package is strongly inspired by Marcio Castilho's blog post, [Handling 1 Million Requests per Minute with Go](http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/).
## Creating the dispatcher

The dispatcher takes two arguments: the number of workers, and the length of the job queue.

``` go
// Create dispatcher/queue with five workers and a queue size of 10
dispatcher := minion.NewDispatcher(5, 10)

// Start the dispatcher and wait for jobs
dispatcher.Run()
```

## Blocking vs non-blocking

The job enqueueing can be either blocking or non-blocking, as required. The non-blocking call can be used to return a HTTP status if the service is at capacity or similar.

``` go
// Blocking call
dispatcher.Enqueue(job)

// Non-blocking call
blocked := dispatcher.TryEnqueue(job)
```

## Example

Expand All @@ -35,7 +61,7 @@ import (

func main() {
// Create dispatcher/queue with five workers and a queue size of 10
dispatcher, jobQueue := minion.NewDispatcher(5, 10)
dispatcher := minion.NewDispatcher(5, 10)

// Start the dispatcher
dispatcher.Run()
Expand All @@ -51,7 +77,7 @@ func main() {
job := Calculation{i, wg}

// Add job to queue, blocks if the job queue is full
jobQueue <- job
dispatcher.Enqueue(job)
}

// Wait for all jobs to finish
Expand Down
27 changes: 20 additions & 7 deletions dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,13 @@ type Dispatcher struct {
}

// NewDispatcher creates a new work dispatcher with the provided number of workers and buffer size for the job queue.
func NewDispatcher(workers, queueSize int) (*Dispatcher, chan<- Job) {
queue := make(chan Job, queueSize)

dispatcher := &Dispatcher{
func NewDispatcher(workers, queueSize int) *Dispatcher {
return &Dispatcher{
workerPool: make(chan chan Job, workers),
jobQueue: queue,
jobQueue: make(chan Job, queueSize),
maxWorkers: workers,
quit: make(chan bool),
}

return dispatcher, queue
}

// Run starts the dispatcher.
Expand All @@ -43,6 +39,23 @@ func (d *Dispatcher) Stop() {
}()
}

// Enqueue adds a job to the job queue. If the queue is full, the function will block until the queue has slots
// available.
func (d *Dispatcher) Enqueue(job Job) {
d.jobQueue <- job
}

// TryEnqueue will try to enqueue a job, without blocking. It returns true if the job was enqueued, false if the job
// queue is full and the job was unable to be enqueued.
func (d *Dispatcher) TryEnqueue(job Job) bool {
select {
case d.jobQueue <- job:
return true
default:
return false
}
}

func (d *Dispatcher) dispatch() {
for {
select {
Expand Down
50 changes: 44 additions & 6 deletions dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"sync/atomic"
"testing"

"time"

"github.com/jsvensson/minion"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
Expand All @@ -23,6 +25,7 @@ type TestJob struct {
// Perform increments the test counter.
func (tj TestJob) Perform() {
atomic.AddUint64(tj.counter, 1)
time.Sleep(20 * time.Millisecond)
tj.wg.Done()
}

Expand All @@ -34,33 +37,68 @@ func (t *MinionTestSuite) SetupTest() {
t.counter = new(uint64)
}

func (t *MinionTestSuite) TestDispatcherSingleJob() {
disp, jobChan := minion.NewDispatcher(1, 1)
func (t *MinionTestSuite) TestEnqueueSingleJob() {
disp := minion.NewDispatcher(1, 1)
disp.Run()

wg := &sync.WaitGroup{}

wg.Add(1)
jobChan <- TestJob{wg, t.counter}
disp.Enqueue(TestJob{wg, t.counter})
wg.Wait()

actual := atomic.LoadUint64(t.counter)
assert.Equal(t.T(), uint64(1), actual)
}

func (t *MinionTestSuite) TestDispatcherManyJobs() {
disp, jobChan := minion.NewDispatcher(3, 10)
func (t *MinionTestSuite) TestEnqueueManyJobs() {
disp := minion.NewDispatcher(3, 10)
disp.Run()

wg := &sync.WaitGroup{}

for i := 0; i < 50; i++ {
wg.Add(1)
jobChan <- TestJob{wg, t.counter}
disp.Enqueue(TestJob{wg, t.counter})
}

wg.Wait()

actual := atomic.LoadUint64(t.counter)
assert.Equal(t.T(), uint64(50), actual)
}

func (t *MinionTestSuite) TestTryEnqueueSingleJob() {
disp := minion.NewDispatcher(1, 1)
disp.Run()

wg := &sync.WaitGroup{}

wg.Add(1)
disp.TryEnqueue(TestJob{wg, t.counter})
wg.Wait()

actual := atomic.LoadUint64(t.counter)
assert.Equal(t.T(), uint64(1), actual)
}

func (t *MinionTestSuite) TestTryEnqueueBlockedJob() {
disp := minion.NewDispatcher(1, 1)

wg := &sync.WaitGroup{}

// Fist call gets enqueued
wg.Add(1)
enqueued := disp.TryEnqueue(TestJob{wg, t.counter})
assert.True(t.T(), enqueued, "first job should not be blocked")

// Second call never gets enqueued
enqueued = disp.TryEnqueue(TestJob{wg, t.counter})
assert.False(t.T(), enqueued, "second job should be blocked")

disp.Run()

wg.Wait()
actual := atomic.LoadUint64(t.counter)
assert.Equal(t.T(), uint64(1), actual)
}

0 comments on commit 22dacc6

Please sign in to comment.