diff --git a/Makefile b/Makefile index f83236e..68847b3 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: test test: - go test ./... -v + go test ./... -v --count=1 .PHONY: lint lint: diff --git a/collections/priorityqueue.go b/collections/priorityqueue.go index 938f332..f30b5aa 100644 --- a/collections/priorityqueue.go +++ b/collections/priorityqueue.go @@ -16,7 +16,7 @@ package collections import ( - "cmp" + "errors" "fmt" "iter" "strings" @@ -36,7 +36,7 @@ type MetricsOpts[T comparable] struct { Labels map[string]any Labeller map[string]func(i T) string DurationBuckets []float64 - MetricName string + Name string Disable bool } @@ -109,31 +109,31 @@ func newMetrics[T comparable](opts MetricsOpts[T]) *metrics[T] { } } - if opts.MetricName == "" { - opts.MetricName = "priority_queue" + if opts.Name == "" { + opts.Name = "priority_queue" } return &metrics[T]{ opts: opts, enqueuedTotal: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: opts.MetricName + "_enqueued_total", + Name: opts.Name + "_enqueued_total", Help: "The total number of enqueued items", }, keys), dedupedTotal: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: opts.MetricName + "_deduped_total", + Name: opts.Name + "_deduped_total", Help: "The total number of enqueued items", }, keys), dequeuedTotal: promauto.NewCounterVec(prometheus.CounterOpts{ - Name: opts.MetricName + "_dequeued_total", + Name: opts.Name + "_dequeued_total", Help: "The total number of dequeued items", }, keys), queueSize: promauto.NewGauge(prometheus.GaugeOpts{ - Name: opts.MetricName + "_size", + Name: opts.Name + "_size", Help: "The current size of the queue", ConstLabels: labels, }), queueDuration: promauto.NewHistogramVec(prometheus.HistogramOpts{ - Name: opts.MetricName + "_duration", + Name: opts.Name + "_duration", Help: "Time an object spent in the queue in milliseconds", Buckets: opts.DurationBuckets, }, keys), @@ -165,17 +165,15 @@ type QueueOpts[T comparable] struct { Metrics MetricsOpts[T] } -func New[T cmp.Ordered](opts QueueOpts[T]) (*Queue[T], error) { - +func NewQueue[T comparable](opts QueueOpts[T]) (*Queue[T], error) { if opts.Dedupe && opts.Equals == nil { - return nil, fmt.Errorf("Dedupe requires Equals function") + return nil, errors.New("dedupe requires Equals function") } if opts.Comparator == nil { - opts.Comparator = func(a, b T) int { - return cmp.Compare(a, b) - } + return nil, errors.New("a comparator function is required") } + return &Queue[T]{ heap: binaryheap.NewWith(func(a, b queueItem[T]) int { return opts.Comparator(a.item, b.item) diff --git a/collections/priorityqueue_test.go b/collections/priorityqueue_test.go index f5c130b..c697f60 100644 --- a/collections/priorityqueue_test.go +++ b/collections/priorityqueue_test.go @@ -12,10 +12,10 @@ import ( . "github.com/onsi/gomega" ) -func TestPriorityQueue(t *testing.T) { +func TestPriorityQueueString(t *testing.T) { g := NewWithT(t) - pq, err := New(QueueOpts[string]{ + pq, err := NewQueue(QueueOpts[string]{ Comparator: strings.Compare, Metrics: MetricsOpts[string]{ Labels: map[string]any{ @@ -52,18 +52,67 @@ func TestPriorityQueue(t *testing.T) { g.Expect("priority_queue_duration_count").To(matchers.MatchCounter(1, "prefix", "i")) g.Expect("priority_queue_size").To(matchers.MatchCounter(0)) +} + +type QueueItem struct { + Timestamp time.Time // Queued time + Obj map[string]any +} + +func (t *QueueItem) Name() string { + return t.Obj["name"].(string) +} + +func NewQueueItem(obj map[string]any) *QueueItem { + return &QueueItem{ + Timestamp: time.Now(), + Obj: obj, + } +} + +func TestPriorityQueue(t *testing.T) { + g := NewWithT(t) + pq, err := NewQueue(QueueOpts[*QueueItem]{ + Metrics: MetricsOpts[*QueueItem]{ + Name: "test", + }, + Comparator: func(a, b *QueueItem) int { + return strings.Compare(a.Obj["name"].(string), b.Obj["name"].(string)) + }, + Dedupe: true, + Equals: func(a, b *QueueItem) bool { + return strings.EqualFold(a.Obj["name"].(string), b.Obj["name"].(string)) + }, + }) + g.Expect(err).To(BeNil()) + g.Expect(pq.Size()).To(BeZero()) + + names := []string{"bob", "foo", "bar", "eve", "baz", "alice", "bob"} + for _, name := range names { + pq.Enqueue(NewQueueItem(map[string]any{"name": name})) + } + + g.Expect(pq.Size()).To(BeNumerically("==", len(names))) + + expected := []string{"alice", "bar", "baz", "bob", "eve", "foo"} + for _, e := range expected { + g.Expect(first(pq.Peek()).Name()).To(Equal(e)) + g.Expect(first(pq.Dequeue()).Name()).Should(Equal(e)) + } + + g.Expect(pq.Size()).To(BeZero()) } func TestPriorityQueueDedupe(t *testing.T) { g := NewWithT(t) - pq, err := New(QueueOpts[string]{ + pq, err := NewQueue(QueueOpts[string]{ Equals: func(a, b string) bool { return a == b }, Dedupe: true, Comparator: strings.Compare, Metrics: MetricsOpts[string]{ - MetricName: "dedupe_queue", + Name: "dedupe_queue", }}) g.Expect(err).To(BeNil()) @@ -94,10 +143,10 @@ func TestPriorityQueueDedupe(t *testing.T) { func TestPriorityQueueConcurrency(t *testing.T) { g := NewWithT(t) - pq, err := New(QueueOpts[string]{ + pq, err := NewQueue(QueueOpts[string]{ Comparator: strings.Compare, Metrics: MetricsOpts[string]{ - MetricName: "concurrent_queue", + Name: "concurrent_queue", }, }) g.Expect(err).To(BeNil()) @@ -150,9 +199,8 @@ func TestPriorityQueueConcurrency(t *testing.T) { g.Expect(pq.Size()).To(BeNumerically("==", 0)) t.Log("\n" + matchers.DumpMetrics("priority")) - } -func first[T1 any, T2 any](a T1, b T2) T1 { +func first[T1 any, T2 any](a T1, _ T2) T1 { return a }