From 22dacc601dc607611ee54759411f8129022ef9fb Mon Sep 17 00:00:00 2001 From: Johan Svensson Date: Sun, 19 Nov 2017 14:07:37 +0100 Subject: [PATCH] Add non-blocking call --- README.md | 34 +++++++++++++++++++++++++++---- dispatcher.go | 27 ++++++++++++++++++------- dispatcher_test.go | 50 ++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 94 insertions(+), 17 deletions(-) diff --git a/README.md b/README.md index bf1d1ee..6b4771b 100644 --- a/README.md +++ b/README.md @@ -5,7 +5,11 @@ [![Go Report Card](https://goreportcard.com/badge/github.com/jsvensson/minion)](https://goreportcard.com/report/github.com/jsvensson/minion) [![codecov](https://codecov.io/gh/jsvensson/minion/branch/master/graph/badge.svg)](https://codecov.io/gh/jsvensson/minion) -**Minion** 🍌 is a worker/dispatcher package for distributing jobs across a number of workers. The dispatcher creates the workers and returns a buffered channel of a given length that is used as the job queue. The incoming jobs are distributed to the available workers. +**Minion** 🍌 is a worker/dispatcher package for distributing jobs across a number of workers. The dispatcher creates the workers and is used to enqueue jobs, using either a blocking or non-blocking function. The incoming jobs are distributed to the available workers. + +This package is strongly inspired by Marcio Castilho's blog post, [Handling 1 Million Requests per Minute with Go](http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/). + +## Creating jobs A job is created by implementing the `Job` interface: @@ -18,7 +22,29 @@ type Job interface { The implementing struct can contain whatever additional fields it needs to perform the job, as seen in the example below. -This package is strongly inspired by Marcio Castilho's blog post, [Handling 1 Million Requests per Minute with Go](http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/). +## Creating the dispatcher + +The dispatcher takes two arguments: the number of workers, and the length of the job queue. + +``` go +// Create dispatcher/queue with five workers and a queue size of 10 +dispatcher := minion.NewDispatcher(5, 10) + +// Start the dispatcher and wait for jobs +dispatcher.Run() +``` + +## Blocking vs non-blocking + +The job enqueueing can be either blocking or non-blocking, as required. The non-blocking call can be used to return a HTTP status if the service is at capacity or similar. + +``` go +// Blocking call +dispatcher.Enqueue(job) + +// Non-blocking call +blocked := dispatcher.TryEnqueue(job) +``` ## Example @@ -35,7 +61,7 @@ import ( func main() { // Create dispatcher/queue with five workers and a queue size of 10 - dispatcher, jobQueue := minion.NewDispatcher(5, 10) + dispatcher := minion.NewDispatcher(5, 10) // Start the dispatcher dispatcher.Run() @@ -51,7 +77,7 @@ func main() { job := Calculation{i, wg} // Add job to queue, blocks if the job queue is full - jobQueue <- job + dispatcher.Enqueue(job) } // Wait for all jobs to finish diff --git a/dispatcher.go b/dispatcher.go index dcbccfe..537637e 100644 --- a/dispatcher.go +++ b/dispatcher.go @@ -13,17 +13,13 @@ type Dispatcher struct { } // NewDispatcher creates a new work dispatcher with the provided number of workers and buffer size for the job queue. -func NewDispatcher(workers, queueSize int) (*Dispatcher, chan<- Job) { - queue := make(chan Job, queueSize) - - dispatcher := &Dispatcher{ +func NewDispatcher(workers, queueSize int) *Dispatcher { + return &Dispatcher{ workerPool: make(chan chan Job, workers), - jobQueue: queue, + jobQueue: make(chan Job, queueSize), maxWorkers: workers, quit: make(chan bool), } - - return dispatcher, queue } // Run starts the dispatcher. @@ -43,6 +39,23 @@ func (d *Dispatcher) Stop() { }() } +// Enqueue adds a job to the job queue. If the queue is full, the function will block until the queue has slots +// available. +func (d *Dispatcher) Enqueue(job Job) { + d.jobQueue <- job +} + +// TryEnqueue will try to enqueue a job, without blocking. It returns true if the job was enqueued, false if the job +// queue is full and the job was unable to be enqueued. +func (d *Dispatcher) TryEnqueue(job Job) bool { + select { + case d.jobQueue <- job: + return true + default: + return false + } +} + func (d *Dispatcher) dispatch() { for { select { diff --git a/dispatcher_test.go b/dispatcher_test.go index 3d6026a..96f9ab5 100644 --- a/dispatcher_test.go +++ b/dispatcher_test.go @@ -5,6 +5,8 @@ import ( "sync/atomic" "testing" + "time" + "github.com/jsvensson/minion" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/suite" @@ -23,6 +25,7 @@ type TestJob struct { // Perform increments the test counter. func (tj TestJob) Perform() { atomic.AddUint64(tj.counter, 1) + time.Sleep(20 * time.Millisecond) tj.wg.Done() } @@ -34,29 +37,29 @@ func (t *MinionTestSuite) SetupTest() { t.counter = new(uint64) } -func (t *MinionTestSuite) TestDispatcherSingleJob() { - disp, jobChan := minion.NewDispatcher(1, 1) +func (t *MinionTestSuite) TestEnqueueSingleJob() { + disp := minion.NewDispatcher(1, 1) disp.Run() wg := &sync.WaitGroup{} wg.Add(1) - jobChan <- TestJob{wg, t.counter} + disp.Enqueue(TestJob{wg, t.counter}) wg.Wait() actual := atomic.LoadUint64(t.counter) assert.Equal(t.T(), uint64(1), actual) } -func (t *MinionTestSuite) TestDispatcherManyJobs() { - disp, jobChan := minion.NewDispatcher(3, 10) +func (t *MinionTestSuite) TestEnqueueManyJobs() { + disp := minion.NewDispatcher(3, 10) disp.Run() wg := &sync.WaitGroup{} for i := 0; i < 50; i++ { wg.Add(1) - jobChan <- TestJob{wg, t.counter} + disp.Enqueue(TestJob{wg, t.counter}) } wg.Wait() @@ -64,3 +67,38 @@ func (t *MinionTestSuite) TestDispatcherManyJobs() { actual := atomic.LoadUint64(t.counter) assert.Equal(t.T(), uint64(50), actual) } + +func (t *MinionTestSuite) TestTryEnqueueSingleJob() { + disp := minion.NewDispatcher(1, 1) + disp.Run() + + wg := &sync.WaitGroup{} + + wg.Add(1) + disp.TryEnqueue(TestJob{wg, t.counter}) + wg.Wait() + + actual := atomic.LoadUint64(t.counter) + assert.Equal(t.T(), uint64(1), actual) +} + +func (t *MinionTestSuite) TestTryEnqueueBlockedJob() { + disp := minion.NewDispatcher(1, 1) + + wg := &sync.WaitGroup{} + + // Fist call gets enqueued + wg.Add(1) + enqueued := disp.TryEnqueue(TestJob{wg, t.counter}) + assert.True(t.T(), enqueued, "first job should not be blocked") + + // Second call never gets enqueued + enqueued = disp.TryEnqueue(TestJob{wg, t.counter}) + assert.False(t.T(), enqueued, "second job should be blocked") + + disp.Run() + + wg.Wait() + actual := atomic.LoadUint64(t.counter) + assert.Equal(t.T(), uint64(1), actual) +}