Skip to content

Commit

Permalink
Merge pull request #4 from lytics/time-windowing
Browse files Browse the repository at this point in the history
Adding a time windowed queue, so that we can support sliding windows.
  • Loading branch information
epsniff authored Jan 13, 2021
2 parents 84d34e4 + 3669033 commit ffc5f67
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 71 deletions.
3 changes: 1 addition & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
language: go

go:
- 1.9.x
- 1.10.x
- master

before_install:
- go get -t -v ./...
Expand Down
17 changes: 10 additions & 7 deletions callgroup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"runtime"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand All @@ -17,8 +18,10 @@ func TestCompletion(t *testing.T) {
reslen += len(finalState)
})

op1 := cg.Add(1, &tsMsg{123, 5, "user", 1234567})
op2 := cg.Add(2, &tsMsg{123, 5, "user", 2222222})
now := time.Now()

op1 := cg.Add(1, &tsMsg{123, now})
op2 := cg.Add(2, &tsMsg{123, now})

assert.Equal(t, 0, completed)
assert.Equal(t, 0, reslen)
Expand All @@ -41,8 +44,10 @@ func TestConcurrentDone(t *testing.T) {
})

ops := []*Op{}
now := time.Now()

for i := 0; i < 1000; i++ {
ops = append(ops, cg.Add(uint64(i), &tsMsg{123, 5, "user", uint64(i)}))
ops = append(ops, cg.Add(uint64(i), &tsMsg{123, now}))
}

wgend := sync.WaitGroup{}
Expand All @@ -65,8 +70,6 @@ func TestConcurrentDone(t *testing.T) {
}

type tsMsg struct {
Aid int
Gen int
Table string
RefsID uint64
ID uint64
Time time.Time
}
4 changes: 2 additions & 2 deletions go.test.sh
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
#!/usr/bin/env bash

set -e
echo "" > coverage.txt

echo "" > coverage.txt
for d in $(go list ./... | grep -v vendor); do
go test -race -coverprofile=profile.out -covermode=atomic $d
go test -count=5 -v -race -coverprofile=profile.out -covermode=atomic $d
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
Expand Down
63 changes: 12 additions & 51 deletions opqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,13 @@ var (
ErrQueueSaturatedWidth = fmt.Errorf("queue is saturated (width)")
)

// OpSet represents the set of Ops that have been merged in an OpQueue,
// It provides convenience functions for appending new Ops and for completing them.
type OpSet struct {
set []*Op
}

func newOpSet() *OpSet {
return &OpSet{
set: []*Op{},
}
}

func (os *OpSet) append(op *Op) {
os.set = append(os.set, op)
}

// Ops get the list of ops in this set.
func (os *OpSet) Ops() []*Op {
return os.set
}

// FinishAll a convenience func that calls finish on each Op in the set, passing the
// results or error to all the Ops in the OpSet.
//
// NOTE: The call group that owns this OP will not call it's finish function until all
// Ops are complete. And one callgroup could be spread over multiple op sets or
// multiple op queues.
func (os *OpSet) FinishAll(err error, resp interface{}) {
for _, op := range os.set {
op.Finish(err, resp)
}
}

// OpQueue is a thread-safe duplicate operation suppression queue, that combines
// duplicate operations (queue entires) into sets that will be dequeued together.

//
// For example, If you enqueue an item with a key that already exists, then that
// item will be appended to that key's set of items. Otherwise the item is
// inserted into the head of the list as a new item.

//
// On Dequeue a SET is returned of all items that share a key in the queue.
// It blocks on dequeue if the queue is empty, but returns an error if the
// queue is full during enqueue.
Expand All @@ -72,6 +39,7 @@ type OpQueue struct {
entries map[ID]*OpSet
}

// NewOpQueue create a new OpQueue.
func NewOpQueue(depth, width int) *OpQueue {
cond := sync.NewCond(&sync.Mutex{})
myctx, can := context.WithCancel(context.Background())
Expand Down Expand Up @@ -123,7 +91,9 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error {

set, ok := q.entries[id]
if !ok {
set = newOpSet()
set = newOpSet(op)
q.entries[id] = set

// This is a new item, so we need to insert it into the queue.
q.enqueue(id)

Expand All @@ -144,15 +114,11 @@ func (q *OpQueue) Enqueue(id ID, op *Op) error {
// the condition lock until this method call returns, finishing
// its append of the new operation.
q.cond.Signal()
}

if len(set.Ops()) >= q.width {
} else if len(set.Ops()) >= q.width {
return ErrQueueSaturatedWidth
} else {
set.append(op)
}

set.append(op)
q.entries[id] = set

return nil
}

Expand All @@ -176,16 +142,11 @@ func (q *OpQueue) Dequeue() (*OpSet, bool) {
return nil, false
default:
}
// release the lock and wait until signaled. On awake we'll require the lock.
// After wait requires the lock we have to recheck the wait condition
// (calling q.dequeue), because it's possible that someone else
// release the lock and wait until signaled. On awake we'll acquire the lock.
// After wait acquires the lock we have to recheck the wait condition,
// because it's possible that someone else
// drained the queue while, we were reacquiring the lock.
q.cond.Wait()
select {
case <-q.ctx.Done():
return nil, false
default:
}
}
}

Expand Down
80 changes: 71 additions & 9 deletions opqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package inflight
import (
"context"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -29,10 +30,11 @@ func TestOpQueue(t *testing.T) {
completed2++
})

op1_1 := cg1.Add(1, &tsMsg{123, 5, "user", 1234567})
op1_2 := cg1.Add(2, &tsMsg{111, 6, "user", 2222222})
op2_1 := cg2.Add(1, &tsMsg{123, 5, "user", 1234567})
op2_2 := cg2.Add(2, &tsMsg{111, 6, "user", 2222222})
now := time.Now()
op1_1 := cg1.Add(1, &tsMsg{123, now})
op1_2 := cg1.Add(2, &tsMsg{111, now})
op2_1 := cg2.Add(1, &tsMsg{123, now})
op2_2 := cg2.Add(2, &tsMsg{111, now})

opq := NewOpQueue(10, 10)
defer opq.Close()
Expand Down Expand Up @@ -85,9 +87,10 @@ func TestOpQueueClose(t *testing.T) {
})

opq := NewOpQueue(10, 10)
now := time.Now()

for i := 0; i < 9; i++ {
op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222})
op := cg1.Add(uint64(i), &tsMsg{uint64(i), now})
err := opq.Enqueue(op.Key, op)
assert.Equal(t, nil, err)
}
Expand Down Expand Up @@ -127,8 +130,10 @@ func TestOpQueueFullDepth(t *testing.T) {
succuess := 0
depthErrors := 0
widthErrors := 0
now := time.Now()

for i := 0; i < 100; i++ {
op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222})
op := cg1.Add(uint64(i), &tsMsg{uint64(i), now})
err := opq.Enqueue(op.Key, op)
switch err {
case nil:
Expand Down Expand Up @@ -171,8 +176,10 @@ func TestOpQueueFullWidth(t *testing.T) {
succuess := 0
depthErrors := 0
widthErrors := 0
now := time.Now()

for i := 0; i < 100; i++ {
op := cg1.Add(1, &tsMsg{i, i, "user", 2222222})
op := cg1.Add(1, &tsMsg{uint64(i), now})
err := opq.Enqueue(op.Key, op)
switch err {
case nil:
Expand Down Expand Up @@ -227,6 +234,8 @@ func TestOpQueueForRaceDetection(t *testing.T) {
finishLine, finish := context.WithCancel(context.Background())
dequeFinishLine, deqFinish := context.WithCancel(context.Background())
const concurrency = 2
now := time.Now()

for w := 0; w < concurrency; w++ {
go func(w int) {
startingLine1.Wait()
Expand All @@ -237,7 +246,7 @@ func TestOpQueueForRaceDetection(t *testing.T) {
return
default:
}
op := cg1.Add(uint64(i), &tsMsg{i, i, "user", 2222222})
op := cg1.Add(uint64(i), &tsMsg{uint64(i), now})
err := opq.Enqueue(op.Key, op)
switch err {
case nil:
Expand Down Expand Up @@ -302,5 +311,58 @@ func TestOpQueueForRaceDetection(t *testing.T) {
// NOTE: I get the following performance on my laptop:
// opqueue_test.go:275: enqueue errors: 137075 mergedMsgs:2553 enqueueCnt:231437 dequeueCnt:231437 rate:115718 msgs/sec
// Over 100k msg a sec is more than fast enough for linkgrid...
t.Logf("enqueue errors: [depth:%v width:%v] mergedMsgs:%v enqueueCnt:%v dequeueCnt:%v rate:%v msgs/sec", depthErrorCnt.Get(), widthErrorCnt.Get(), mergeCnt.Get(), enq, deq, enq/runtime)
t.Logf("Run Stats [note errors are expect for this test]")
t.Logf(" enqueue errors:[depth-errs:%v width-errs:%v]", depthErrorCnt.Get(), widthErrorCnt.Get())
t.Logf(" mergedMsgs:%v enqueueCnt:%v dequeueCnt:%v rate:%v msgs/sec", mergeCnt.Get(), enq, deq, enq/runtime)
}

func TestOpWindowCloseConcurrent(t *testing.T) {
t.Parallel()

cg1 := NewCallGroup(func(finalState map[ID]*Response) {})
cg2 := NewCallGroup(func(finalState map[ID]*Response) {})

now := time.Now()

op1 := cg1.Add(1, &tsMsg{123, now})
op2 := cg2.Add(2, &tsMsg{321, now})

oq := NewOpQueue(300, 500)

var ops uint64
var closes uint64
const workers int = 12
for i := 0; i < workers; i++ {
go func() {
for {
e, ok := oq.Dequeue()
if e != nil {
assert.True(t, ok)
atomic.AddUint64(&ops, 1)
} else {
assert.False(t, ok)
break
}
}
atomic.AddUint64(&closes, 1)
}()
}

time.Sleep(100 * time.Millisecond)
assert.Equal(t, uint64(0), atomic.LoadUint64(&ops)) // nothing should have been dequeued yet
assert.Equal(t, uint64(0), atomic.LoadUint64(&closes))

err := oq.Enqueue(op1.Key, op1)
assert.Equal(t, nil, err)
err = oq.Enqueue(op2.Key, op2)
assert.Equal(t, nil, err)

time.Sleep(100 * time.Millisecond)
assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // 2 uniq keys are enqueued
assert.Equal(t, uint64(0), atomic.LoadUint64(&closes))

oq.Close()
time.Sleep(100 * time.Millisecond)
assert.Equal(t, uint64(2), atomic.LoadUint64(&ops)) // we still only had 2 uniq keys seen
assert.Equal(t, uint64(workers), atomic.LoadUint64(&closes))
}
38 changes: 38 additions & 0 deletions opset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package inflight

import "time"

// OpSet represents the set of Ops that have been merged in an OpQueue,
// It provides convenience functions for appending new Ops and for completing them.
type OpSet struct {
set []*Op
// used by the opWindow determine when it's ok to dequeue
enqueuedAt time.Time
}

func newOpSet(op *Op) *OpSet {
return &OpSet{
set: []*Op{op},
}
}

func (os *OpSet) append(op *Op) {
os.set = append(os.set, op)
}

// Ops get the list of ops in this set.
func (os *OpSet) Ops() []*Op {
return os.set
}

// FinishAll a convenience func that calls finish on each Op in the set, passing the
// results or error to all the Ops in the OpSet.
//
// NOTE: The call group that owns this OP will not call it's finish function until all
// Ops are complete. And one callgroup could be spread over multiple op sets or
// multiple op queues.
func (os *OpSet) FinishAll(err error, resp interface{}) {
for _, op := range os.set {
op.Finish(err, resp)
}
}
Loading

0 comments on commit ffc5f67

Please sign in to comment.