From 63293b3572880212a98bac19d8a1a407b513f397 Mon Sep 17 00:00:00 2001 From: eric Date: Sun, 18 Mar 2018 10:56:28 -0700 Subject: [PATCH] adding initial copies of the code --- Readme.md | 110 ++++++++++++++++++++ callgroup.go | 109 +++++++++++++++++++ callgroup_test.go | 72 +++++++++++++ doc.go | 15 +++ example/main.go | 53 ++++++++++ example_test.go | 55 ++++++++++ opqueue.go | 205 ++++++++++++++++++++++++++++++++++++ opqueue_test.go | 241 +++++++++++++++++++++++++++++++++++++++++++ testutils/atomics.go | 34 ++++++ 9 files changed, 894 insertions(+) create mode 100644 Readme.md create mode 100644 callgroup.go create mode 100644 callgroup_test.go create mode 100644 doc.go create mode 100644 example/main.go create mode 100644 example_test.go create mode 100644 opqueue.go create mode 100644 opqueue_test.go create mode 100644 testutils/atomics.go diff --git a/Readme.md b/Readme.md new file mode 100644 index 0000000..705d992 --- /dev/null +++ b/Readme.md @@ -0,0 +1,110 @@ +# Inflight - primitives for coordinating interdependent operations in distrubuted systems + +The package inflight provides primitives(data strutures) for managing inflight operations that +are being processed in a distrubuted system. + +## CallGroup + +CallGroup spawns off a group of operations for each call to `Add()` and +calls the `CallGroupCompletion` func when the last operation have +completed. The CallGroupCompletion func can be thought of as a finalizer where +one can gather errors and/or results from the function calls. + + Example Usage: +```go +package main + +import ( + "fmt" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/lytics/inflight" +) + +func main() { + data := []string{"1:2", "5:6:7", "1:2", "5:6:7"} + total := int64(0) + wg := sync.WaitGroup{} + wg.Add(1) + cg := inflight.NewCallGroup(func(results map[inflight.ID]*inflight.Response) { + for _, res := range results { + subtotal := res.Result.(int) + atomic.AddInt64(&(total), int64(subtotal)) + } + wg.Done() + }) + + startingLine := sync.WaitGroup{} + startingLine.Add(1) // block all go routines until the loop has finished spinning them up. Otherwise we have a race. + //Spawn off the workers. + for id, entry := range data { + op := cg.Add(uint64(id), entry) + go func(op *inflight.Op) { + startingLine.Wait() //wait here until signaled to start. + str := op.Msg.(string) + subtotal := 0 + for _, val := range strings.Split(str, ":") { + i, _ := strconv.ParseInt(val, 10, 64) + subtotal += int(i) + } + op.Finish(nil, subtotal) + }(op) + } + startingLine.Done() // drop the checkered flag and signal all the workers to begin. + + //wait for the completion function to finish. + wg.Wait() + totalVal := atomic.LoadInt64(&(total)) + if totalVal != 42 { + // total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42 + fmt.Printf("total not equal 42, got:%v \n", totalVal) + } + //total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42 + fmt.Printf("got the expected amount of %v\n", total) +} +``` + + +## Opqueue + +OpQueue is a thread-safe duplicate operation suppression queue, that combines +duplicate operations (queue entires) into sets that will be dequeued togather. + +For example, If you enqueue an item with a key that already exists, then that +item will be appended to that key's set of items. Otherwise the item is +inserted into the head of the list as a new item. + +On Dequeue a SET is returned of all items that share a key in the queue. +It blocks on dequeue if the queue is empty, but returns an error if the +queue is full during enqueue. + +``` + +------------Width------------> + + +-----+ + | |ID | + | |923 | + | +-----+ + | | + | | + | v +Height | +-----+ +-----+ +-----+ + | |ID +---->ID +---->ID | + | |424 | |424 | |424 | + | +-----+ +-----+ +-----+ + | | + | | + | v + | +-----+ + | |ID | + | |99 | + | +-----+ + v +``` + + + + + diff --git a/callgroup.go b/callgroup.go new file mode 100644 index 0000000..bf4997a --- /dev/null +++ b/callgroup.go @@ -0,0 +1,109 @@ +package inflight + +import ( + "sync" +) + +type ID uint64 + +// CallGroup spawns off a group of operations for each call to Add() and +// calls the CallGroupCompletion func when the last operation have +// completed. The CallGroupCompletion func can be thought of as a finalizer where +// one can gather errors and/or results from the function calls. +// +// Call Add for all our inflight tasks before calling the first +// call to Finish. Once the last task finishes and the CallGroupCompletion +// is triggered, all future calls to Add will be ignored and orphaned. +// +type CallGroup struct { + mu sync.Mutex + + cgcOnce sync.Once + callGroupCompletion CallGroupCompletion + + outstandingOps map[ID]*Op + finalState map[ID]*Response +} + +// NewCallGroup return a new CallGroup. +// Takes a CallGroupCompletion func as an argument, which will be called when the last Op in +// the CallGroup has called Finish. +// +// In a way a CallGroup is like a Mapper-Reducer in other framworks, with +// the Ops being mapped out to workers and the CallGroupCompletion being the reducer step. +func NewCallGroup(cgc CallGroupCompletion) *CallGroup { + return &CallGroup{ + outstandingOps: map[ID]*Op{}, + finalState: map[ID]*Response{}, + callGroupCompletion: cgc, + cgcOnce: sync.Once{}, + } +} + +func (cg *CallGroup) Add(k uint64, msg interface{}) *Op { + key := ID(k) + + op := &Op{ + cg: cg, + Key: key, + Msg: msg, + } + + cg.mu.Lock() + defer cg.mu.Unlock() + + cg.outstandingOps[key] = op + + return op +} + +//Used to by the package to extract the active ops for this callgroup. +func (cg *CallGroup) ops() map[ID]*Op { + return cg.outstandingOps +} + +func (cg *CallGroup) done() { + if len(cg.outstandingOps) > 0 { + return + } + + cg.cgcOnce.Do(func() { + //callGroupCompletion should never be nil, so let it panic if it is. + cg.callGroupCompletion(cg.finalState) + }) +} + +//CallGroupCompletion is the reducer function for a callgroup, its called once all +// Ops in the callgroup have called Finished and the final state is passed to this +// function. +type CallGroupCompletion func(finalState map[ID]*Response) + +//Op represents one inflight operaton or message. When this Op's Finish func is called +// the results for this Op will be added to the finalState. When all Ops in the +// callgroup have called Finish, then the CallGroup's CallGroupCompletion func will be +// called with the final state for all Ops. +type Op struct { + cg *CallGroup + Key ID + Msg interface{} +} + +func (o *Op) Finish(err error, resp interface{}) { + o.cg.mu.Lock() + defer o.cg.mu.Unlock() + + if err != nil { + o.cg.finalState[o.Key] = &Response{Op: o, Err: err} + } else { + o.cg.finalState[o.Key] = &Response{Op: o, Result: resp} + } + delete(o.cg.outstandingOps, o.Key) + + o.cg.done() +} + +type Response struct { + Op *Op + Err error + Result interface{} +} diff --git a/callgroup_test.go b/callgroup_test.go new file mode 100644 index 0000000..71fbd8f --- /dev/null +++ b/callgroup_test.go @@ -0,0 +1,72 @@ +package inflight + +import ( + "runtime" + "sync" + "testing" + + "github.com/bmizerany/assert" +) + +func TestCompletion(t *testing.T) { + t.Parallel() + completed := 0 + reslen := 0 + cg := NewCallGroup(func(finalState map[ID]*Response) { + completed++ + reslen += len(finalState) + }) + + op1 := cg.Add(1, &tsMsg{123, 5, "user", 1234567}) + op2 := cg.Add(2, &tsMsg{123, 5, "user", 2222222}) + + assert.T(t, completed == 0) + assert.T(t, reslen == 0) + op1.Finish(nil, nil) + assert.T(t, completed == 0) + assert.T(t, reslen == 0) + op2.Finish(nil, nil) + assert.T(t, completed == 1) + assert.T(t, reslen == 2) +} + +func TestConcurrentDone(t *testing.T) { + runtime.GOMAXPROCS(16) + t.Parallel() + completed := 0 + reslen := 0 + cg := NewCallGroup(func(finalState map[ID]*Response) { + completed++ + reslen += len(finalState) + }) + + ops := []*Op{} + for i := 0; i < 1000; i++ { + ops = append(ops, cg.Add(uint64(i), &tsMsg{123, 5, "user", uint64(i)})) + } + + wgend := sync.WaitGroup{} + wgstart := sync.WaitGroup{} + wgstart.Add(1) + + for i := 0; i < 1000; i++ { + wgend.Add(1) + go func(id int) { + defer wgend.Done() + wgstart.Wait() //block until the testcase signals all go routines to fire at once. + ops[id].Finish(nil, nil) + }(i) + } + wgstart.Done() //start all go routines at the same time. + wgend.Wait() + + assert.T(t, completed == 1) + assert.T(t, reslen == 1000) +} + +type tsMsg struct { + Aid int + Gen int + Table string + RefsID uint64 +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..32e6c8b --- /dev/null +++ b/doc.go @@ -0,0 +1,15 @@ +/* +The package inflight provides primitives(data strutures) for managing inflight operations that +are being processed in a distrubuted system. + +CallGroup spawns off a group of operations for each call to Add() and +calls the CallGroupCompletion func when the last operation have +completed. The CallGroupCompletion func can be thought of as a finalizer where +one can gather errors and/or results from the function calls. + +OpQueue is a thread-safe duplicate operation suppression queue, that combines +duplicate operations (queue entires) into sets that will be dequeued togather. + + +*/ +package inflight diff --git a/example/main.go b/example/main.go new file mode 100644 index 0000000..443ceb4 --- /dev/null +++ b/example/main.go @@ -0,0 +1,53 @@ +package main + +import ( + "fmt" + "strconv" + "strings" + "sync" + "sync/atomic" + + "github.com/lytics/inflight" +) + +func main() { + data := []string{"1:2", "5:6:7", "1:2", "5:6:7"} + total := int64(0) + wg := sync.WaitGroup{} + wg.Add(1) + cg := inflight.NewCallGroup(func(results map[inflight.ID]*inflight.Response) { + for _, res := range results { + subtotal := res.Result.(int) + atomic.AddInt64(&(total), int64(subtotal)) + } + wg.Done() + }) + + startingLine := sync.WaitGroup{} + startingLine.Add(1) // block all go routines until the loop has finished spinning them up. Otherwise we have a race. + //Spawn off the workers. + for id, entry := range data { + op := cg.Add(uint64(id), entry) + go func(op *inflight.Op) { + startingLine.Wait() //wait here until signaled to start. + str := op.Msg.(string) + subtotal := 0 + for _, val := range strings.Split(str, ":") { + i, _ := strconv.ParseInt(val, 10, 64) + subtotal += int(i) + } + op.Finish(nil, subtotal) + }(op) + } + startingLine.Done() // drop the checkered flag and signal all the workers to begin. + + //wait for the completion function to finish. + wg.Wait() + totalVal := atomic.LoadInt64(&(total)) + if totalVal != 42 { + // total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42 + fmt.Printf("total not equal 42, got:%v \n", totalVal) + } + //total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42 + fmt.Printf("got the expected amount of %v\n", total) +} diff --git a/example_test.go b/example_test.go new file mode 100644 index 0000000..20d5ca0 --- /dev/null +++ b/example_test.go @@ -0,0 +1,55 @@ +package inflight_test + +import ( + "strconv" + "strings" + "sync" + "testing" + + "github.com/lytics/inflight/testutils" + "github.com/lytics/lio/src/lib/inflight" +) + +//TestExample1 uses the callgroup to do a concurrently map reduce, by spliting and +// parsing an array of strings to calculate the subtotals for each entry. +// Then in the Complete func, the subtotals are reduced into a total. +func TestExample1(t *testing.T) { + t.Parallel() + data := []string{"1:2", "5:6:7", "1:2", "5:6:7"} + total := testutils.AtomicInt{} + wg := sync.WaitGroup{} + wg.Add(1) + cg := inflight.NewCallGroup(func(results map[inflight.ID]*inflight.Response) { + for _, res := range results { + subtotal := res.Result.(int) + total.IncrBy(subtotal) + } + wg.Done() + }) + + startingLine := sync.WaitGroup{} + startingLine.Add(1) // block all go routines until the loop has finished spinning them up. Otherwise we have a race. + //Spawn off the workers. + for id, entry := range data { + op := cg.Add(uint64(id), entry) + go func(op *inflight.Op) { + startingLine.Wait() //wait here until signaled to start. + str := op.Msg.(string) + subtotal := 0 + for _, val := range strings.Split(str, ":") { + i, _ := strconv.ParseInt(val, 10, 64) + subtotal += int(i) + } + op.Finish(nil, subtotal) + }(op) + } + startingLine.Done() // drop the checkered flag and signal all the workers to begin. + + //wait for the completion function to finish. + wg.Wait() + totalVal := total.Get() + if totalVal != 42 { + // total == (1 + 2) + (5 + 6 + 7) + (1 + 2) + (5 + 6 + 7) == 42 + t.Fatalf("total not equal 42, got:%v", totalVal) + } +} diff --git a/opqueue.go b/opqueue.go new file mode 100644 index 0000000..a204b8a --- /dev/null +++ b/opqueue.go @@ -0,0 +1,205 @@ +package inflight + +import ( + "container/list" + "context" + "fmt" + "sync" +) + +var ErrQueueSaturated = fmt.Errorf("queue is saturated") + +/* +OpSet represents the set of Ops that have been merged in an OpQueue, +It provides convenience functions for appending new Ops and for completing them. +*/ +type OpSet struct { + set []*Op +} + +func newOpSet() *OpSet { + return &OpSet{ + set: []*Op{}, + } +} + +func (os *OpSet) append(op *Op) { + os.set = append(os.set, op) +} + +//Ops +func (os *OpSet) Ops() []*Op { + return os.set +} + +//FinishAll a convenience func that calls finish on each Op in the set, passing the +// results or error to all the Ops in the OpSet. +// +// NOTE: The call group that owns this OP will not call it's finish function until all +// Ops are complete. And one callgroup could be spread over multiple op sets or +// multiple op queues. +func (os *OpSet) FinishAll(err error, resp interface{}) { + for _, op := range os.set { + op.Finish(err, resp) + } +} + +/* +OpQueue is a thread-safe duplicate operation suppression queue, that combines +duplicate operations (queue entires) into sets that will be dequeued togather. + +For example, If you enqueue an item with a key that already exists, then that +item will be appended to that key's set of items. Otherwise the item is +inserted into the head of the list as a new item. + +On Dequeue a SET is returned of all items that share a key in the queue. +It blocks on dequeue if the queue is empty, but returns an error if the +queue is full during enqueue. +*/ +type OpQueue struct { + cond *sync.Cond + ctx context.Context + can context.CancelFunc + + height int + width int + q *list.List + entries map[ID]*OpSet +} + +func NewOpQueue(height, width int) *OpQueue { + cond := sync.NewCond(&sync.Mutex{}) + myctx, can := context.WithCancel(context.Background()) + q := &OpQueue{ + cond: cond, + ctx: myctx, + can: can, + height: height, + width: width, + q: list.New(), + entries: map[ID]*OpSet{}, + } + go func() { + <-myctx.Done() + q.cond.L.Lock() + defer q.cond.L.Unlock() + cond.Broadcast() // alert all dequeue calls that they should wake up and return. + }() + return q +} + +// Close releases resources associated with this callgroup, by canceling the context. +// The owner of this OpQueue should either call Close or cancel the context, both are +// equivalent. +func (q *OpQueue) Close() { + q.can() +} + +// Len returns the number of uniq IDs in the queue, that is the height of the queue. +func (q *OpQueue) Len() int { + q.cond.L.Lock() + defer q.cond.L.Unlock() + return q.q.Len() +} + +// Enqueue add the op to the queue. If the ID already exists then the Op +// is added to the existing OpSet for this ID, otherwise it's inserted as a new +// OpSet. +// +// Enqueue doesn't block if the queue if full, instead it returns a ErrQueueSaturated +// error. +func (q *OpQueue) Enqueue(id ID, op *Op) error { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + if q.q.Len() >= q.height { + return ErrQueueSaturated + } + + set, ok := q.entries[id] + if !ok { + set = newOpSet() + // This is a new item, so we need to insert it into the queue. + q.enqueue(id) + + // Signal one waiting go routine to wake up and Dequeue + // I believe we only need to signal if we enqueue a new item. + // Consider the following possible states the queue could be in : + // 1. if no one is currently waiting in Dequeue, the signal isn't + // needed and all items will be dequeued on the next call to + // Dequeue. + // 2. One or Many go-routines are waiting in Dequeue because it's + // empty, and calling Signal will wake up one. Which will dequeue + // the item and return. + // 3. At most One go-routine is in the act of Dequeueing existing items + // from the queue (i.e. only one can have the lock and be in the "if OK" + // condition within the forloop in Dequeue). In which cause the signal + // is ignored and after returning we return to condition (1) above. + // Note signaled waiting go-routines will not be able the acquire + // the condition lock until this method call returns, finishing + // its append of the new operation. + q.cond.Signal() + } + + if len(set.Ops()) >= q.width { + return ErrQueueSaturated + } + + set.append(op) + q.entries[id] = set + + return nil +} + +// Dequeue removes the oldest OpSet from the queue and returns it. +// Dequeue will block if the Queue is empty. An Enqueue will wake the +// go routine up and it will continue on. +// +// If the OpQueue is closed, then Dequeue will return false +// for the second parameter. +func (q *OpQueue) Dequeue() (*OpSet, bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + + for { + if set, ok := q.dequeue(); ok { + return set, true + } else { + select { + case <-q.ctx.Done(): + return nil, false + default: + } + //release the lock and wait until signaled. On awake we'll require the lock. + // After wait requires the lock we have to recheck the wait condition + // (calling q.dequeue), because it's possible that someone else + // drained the queue while, we were reacquiring the lock. + q.cond.Wait() + select { + case <-q.ctx.Done(): + return nil, false + default: + } + } + } +} + +func (q *OpQueue) enqueue(id ID) { + q.q.PushBack(id) +} + +func (q *OpQueue) dequeue() (*OpSet, bool) { + elem := q.q.Front() + if elem == nil { + return nil, false + } + idt := q.q.Remove(elem) + id := idt.(ID) + + set, ok := q.entries[id] + if !ok { + panic("invariant broken: we dequeued a value that isn't in the map") + } + delete(q.entries, id) + return set, true +} diff --git a/opqueue_test.go b/opqueue_test.go new file mode 100644 index 0000000..d64ffaf --- /dev/null +++ b/opqueue_test.go @@ -0,0 +1,241 @@ +package inflight + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/bmizerany/assert" + "github.com/lytics/inflight/testutils" +) + +/* +To insure consistency I suggest running the test for a while with the following, +and if after 5 mins it never fails then we know the testcases are consistent. + while go test -v --race ; do echo `date` ; done +*/ + +func TestOpQueue(t *testing.T) { + t.Parallel() + completed1 := 0 + completed2 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + cg2 := NewCallGroup(func(finalState map[ID]*Response) { + completed2++ + }) + + op1_1 := cg1.Add(1, &tsMsg{123, 5, "user", 1234567}) + op1_2 := cg1.Add(2, &tsMsg{111, 6, "user", 2222222}) + op2_1 := cg2.Add(1, &tsMsg{123, 5, "user", 1234567}) + op2_2 := cg2.Add(2, &tsMsg{111, 6, "user", 2222222}) + + opq := NewOpQueue(10, 10) + defer opq.Close() + + { + err := opq.Enqueue(op1_1.Key, op1_1) + assert.T(t, err == nil) + err = opq.Enqueue(op2_1.Key, op2_1) + assert.T(t, err == nil) + err = opq.Enqueue(op1_2.Key, op1_2) + assert.T(t, err == nil) + err = opq.Enqueue(op2_2.Key, op2_2) + assert.T(t, err == nil) + } + + set1, open := opq.Dequeue() + assert.T(t, open) + set2, open := opq.Dequeue() + assert.T(t, open) + + { + assert.T(t, len(set1.Ops()) == 2) + assert.T(t, len(set2.Ops()) == 2) + } + + { + //The sets should be made of one item of each callgroup, so we'll + //have to complete both sets before we expect complete[1,2] to increment + assert.T(t, completed1 == 0) + assert.T(t, completed2 == 0) + set := set1.Ops() + set[0].Finish(nil, nil) + set[1].Finish(nil, nil) + assert.T(t, completed1 == 0) + assert.T(t, completed2 == 0) + + set = set2.Ops() + set[0].Finish(nil, nil) + set[1].Finish(nil, nil) + assert.T(t, completed1 == 1) + assert.T(t, completed2 == 1) + } +} + +func TestOpQueueClose(t *testing.T) { + t.Parallel() + completed1 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + opq := NewOpQueue(10, 10) + + for i := 0; i < 9; i++ { + op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222}) + err := opq.Enqueue(op.Key, op) + assert.T(t, err == nil) + } + + timer := time.AfterFunc(5*time.Second, func() { + t.Fatalf("testcase timed out after 5 secs.") + }) + for i := 0; i < 9; i++ { + set1, open := opq.Dequeue() + assert.T(t, open) + assert.Tf(t, len(set1.Ops()) == 1, " at loop:%v set1_len:%v", i, len(set1.Ops())) + } + timer.Stop() + + st := time.Now() + time.AfterFunc(10*time.Millisecond, func() { + opq.Close() // calling close should release the call to opq.Dequeue() + }) + set1, open := opq.Dequeue() //this call should hang until we call Close above. + assert.T(t, open == false, "expect:false got:%v", open) + assert.T(t, set1 == nil) + rt := time.Since(st) + assert.Tf(t, rt >= 10*time.Millisecond, "we shouldn't have returned until Close was called: returned after:%v", rt) + +} + +func TestOpQueueFull(t *testing.T) { + t.Parallel() + completed1 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + opq := NewOpQueue(10, 10) + defer opq.Close() + + succuess := 0 + fullErrors := 0 + for i := 0; i < 100; i++ { + op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222}) + err := opq.Enqueue(op.Key, op) + switch err { + case nil: + succuess++ + case ErrQueueSaturated: + fullErrors++ + default: + t.Fatalf("unexpected error: %v", err) + } + } + assert.Equalf(t, succuess, 10, "expected 10, got:%v", succuess) + assert.Equalf(t, fullErrors, 90, "expected 10, got:%v", fullErrors) + + timer := time.AfterFunc(5*time.Second, func() { + t.Fatalf("testcase timed out after 5 secs.") + }) + for i := 0; i < succuess; i++ { + set1, open := opq.Dequeue() + assert.T(t, open) + assert.Tf(t, len(set1.Ops()) == 1, " at loop:%v set1_len:%v", i, len(set1.Ops())) + } + timer.Stop() +} + +func TestOpQueueForRaceDetection(t *testing.T) { + t.Parallel() + completed1 := 0 + cg1 := NewCallGroup(func(finalState map[ID]*Response) { + completed1++ + }) + + enqueueCnt := testutils.AtomicInt{} + dequeueCnt := testutils.AtomicInt{} + mergeCnt := testutils.AtomicInt{} + fullErrorCnt := testutils.AtomicInt{} + + opq := NewOpQueue(300, 500) + + startingLine := sync.WaitGroup{} + startingLine.Add(1) // block all go routines until the loop has finished spinning them up. + + finishLine, finish := context.WithCancel(context.Background()) + dequeFinishLine, deqFinish := context.WithCancel(context.Background()) + const concurrency = 2 + for w := 0; w < concurrency; w++ { + go func() { + startingLine.Wait() + for i := 0; i < 1000000; i++ { + select { + case <-finishLine.Done(): + return + default: + } + op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222}) + err := opq.Enqueue(op.Key, op) + switch err { + case nil: + enqueueCnt.Incr() + case ErrQueueSaturated: + fullErrorCnt.Incr() + default: + t.Fatalf("unexpected error: %v", err) + } + } + }() + } + + for w := 0; w < concurrency; w++ { + go func() { + startingLine.Wait() + for { + select { + case <-dequeFinishLine.Done(): + return + default: + } + set1, open := opq.Dequeue() + assert.T(t, open) + dequeueCnt.IncrBy(len(set1.Ops())) + if len(set1.Ops()) > 1 { + mergeCnt.Incr() + } + } + }() + } + startingLine.Done() //release all the waiting workers. + + const runtime = 2 + timeout := time.AfterFunc((runtime+10)*time.Second, func() { + t.Fatalf("testcase timed out after 5 secs.") + }) + defer timeout.Stop() + + //let the testcase run for N seconds + time.AfterFunc(runtime*time.Second, func() { + finish() + }) + <-finishLine.Done() + // Sleep to give the dequeue workers plenty of time to drain the queue before exiting. + time.Sleep(500 * time.Millisecond) + deqFinish() + + enq := enqueueCnt.Get() + deq := dequeueCnt.Get() + if enq != deq { + t.Fatalf("enqueueCnt and dequeueCnt should match: enq:% deq:%v", enq, deq) + } + // NOTE: I get the following performance on my laptop: + // opqueue_test.go:275: enqueue errors: 137075 mergedMsgs:2553 enqueueCnt:231437 dequeueCnt:231437 rate:115718 msgs/sec + // Over 100k msg a sec is more than fast enough for linkgrid... + t.Logf("enqueue errors: %v mergedMsgs:%v enqueueCnt:%v dequeueCnt:%v rate:%v msgs/sec", fullErrorCnt.Get(), mergeCnt.Get(), enq, deq, enq/runtime) +} diff --git a/testutils/atomics.go b/testutils/atomics.go new file mode 100644 index 0000000..ed76f3c --- /dev/null +++ b/testutils/atomics.go @@ -0,0 +1,34 @@ +package testutils + +import "sync/atomic" + +// AtomicInt provides an atomic int with built in increment +// decrement helpers for easy counting +type AtomicInt struct { + Val int64 `json:"val"` +} + +func (i *AtomicInt) Set(value int64) { + atomic.StoreInt64(&(i.Val), value) +} + +func (i *AtomicInt) Get() int64 { + return atomic.LoadInt64(&(i.Val)) +} + +func (i *AtomicInt) Incr() { + atomic.AddInt64(&(i.Val), 1) +} + +func (i *AtomicInt) Decr() { + atomic.AddInt64(&(i.Val), -1) +} + +func (i *AtomicInt) IncrBy(by int) { + atomic.AddInt64(&(i.Val), int64(by)) +} + +func (i *AtomicInt) DecrBy(by int) { + byt := 0 - by + atomic.AddInt64(&(i.Val), int64(byt)) +}