From 8bfda9d3ca29ba36eae02e978499892c728e5dd8 Mon Sep 17 00:00:00 2001 From: Tyler Williams Date: Fri, 20 Dec 2024 10:15:27 -0800 Subject: [PATCH] Add a generic priority queue (used in followup) (#8105) --- server/util/priority_queue/BUILD | 17 +++ server/util/priority_queue/priority_queue.go | 134 ++++++++++++++++++ .../priority_queue/priority_queue_test.go | 35 +++++ 3 files changed, 186 insertions(+) create mode 100644 server/util/priority_queue/BUILD create mode 100644 server/util/priority_queue/priority_queue.go create mode 100644 server/util/priority_queue/priority_queue_test.go diff --git a/server/util/priority_queue/BUILD b/server/util/priority_queue/BUILD new file mode 100644 index 00000000000..e0345979e03 --- /dev/null +++ b/server/util/priority_queue/BUILD @@ -0,0 +1,17 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "priority_queue", + srcs = ["priority_queue.go"], + importpath = "github.com/buildbuddy-io/buildbuddy/server/util/priority_queue", + visibility = ["//visibility:public"], +) + +go_test( + name = "priority_queue_test", + srcs = ["priority_queue_test.go"], + deps = [ + ":priority_queue", + "@com_github_stretchr_testify//assert", + ], +) diff --git a/server/util/priority_queue/priority_queue.go b/server/util/priority_queue/priority_queue.go new file mode 100644 index 00000000000..6b7c8194daa --- /dev/null +++ b/server/util/priority_queue/priority_queue.go @@ -0,0 +1,134 @@ +package priority_queue + +import ( + "container/heap" + "sync" + "time" +) + +// A pqItem is the element managed by a priority queue. +type pqItem[V any] struct { + value V + priority int + insertTime time.Time +} + +// innerPQ implements heap.Interface and holds pqItems. +type innerPQ[V any] []*pqItem[V] + +func (pq innerPQ[V]) Len() int { return len(pq) } +func (pq innerPQ[V]) Less(i, j int) bool { + return pq[i].priority > pq[j].priority || + (pq[i].priority == pq[j].priority && pq[i].insertTime.Before(pq[j].insertTime)) +} +func (pq innerPQ[V]) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] +} +func (pq *innerPQ[V]) Push(x any) { + item := x.(*pqItem[V]) + *pq = append(*pq, item) +} +func (pq *innerPQ[V]) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + *pq = old[0 : n-1] + return item +} + +// PriorityQueue implements a thread safe priority queue for type V. +// If the queue is empty, calling Pop() or Peek() will return a zero value of +// type V, or a specific empty value configured via options. +type PriorityQueue[V any] struct { + inner *innerPQ[V] + mu sync.Mutex + noValueFunc func() V +} + +type Options struct { + noValueGetter func() any +} + +func defaultOptions() *Options { + return &Options{} +} + +type Option func(*Options) + +func WithEmptyValue(v any) Option { + return func(o *Options) { + o.noValueGetter = func() any { + return v + } + } +} + +func New[V any](opts ...Option) *PriorityQueue[V] { + inner := make(innerPQ[V], 0) + pq := &PriorityQueue[V]{ + inner: &inner, + } + options := defaultOptions() + for _, opt := range opts { + opt(options) + } + if options.noValueGetter != nil { + pq.noValueFunc = func() V { + return options.noValueGetter().(V) + } + } else { + pq.noValueFunc = pq.zeroValue + } + return pq +} + +func (pq *PriorityQueue[V]) zeroValue() V { + var zero V + return zero +} + +func (pq *PriorityQueue[V]) Push(v V, priority int) { + pq.mu.Lock() + defer pq.mu.Unlock() + heap.Push(pq.inner, &pqItem[V]{ + value: v, + insertTime: time.Now(), + priority: priority, + }) +} + +func (pq *PriorityQueue[V]) Pop() V { + pq.mu.Lock() + defer pq.mu.Unlock() + if len(*pq.inner) == 0 { + return pq.noValueFunc() + } + item := heap.Pop(pq.inner).(*pqItem[V]) + return item.value +} + +func (pq *PriorityQueue[V]) Peek() V { + pq.mu.Lock() + defer pq.mu.Unlock() + if len(*pq.inner) == 0 { + return pq.noValueFunc() + } + return (*pq.inner)[0].value +} + +func (pq *PriorityQueue[V]) GetAll() []V { + pq.mu.Lock() + defer pq.mu.Unlock() + var allValues []V + for _, i := range *pq.inner { + allValues = append(allValues, i.value) + } + return allValues +} + +func (pq *PriorityQueue[V]) Len() int { + pq.mu.Lock() + defer pq.mu.Unlock() + return len(*pq.inner) +} diff --git a/server/util/priority_queue/priority_queue_test.go b/server/util/priority_queue/priority_queue_test.go new file mode 100644 index 00000000000..233791d6b0b --- /dev/null +++ b/server/util/priority_queue/priority_queue_test.go @@ -0,0 +1,35 @@ +package priority_queue_test + +import ( + "testing" + + "github.com/buildbuddy-io/buildbuddy/server/util/priority_queue" + "github.com/stretchr/testify/assert" +) + +func TestPushPop(t *testing.T) { + q := priority_queue.New[string]() + q.Push("A", 1) + q.Push("E", 5) + q.Push("D", 4) + q.Push("B", 2) + + assert.Equal(t, "E", q.Pop()) + assert.Equal(t, "D", q.Pop()) + assert.Equal(t, "B", q.Pop()) + assert.Equal(t, "A", q.Pop()) + assert.Equal(t, "", q.Pop()) +} + +func TestZeroValue(t *testing.T) { + q := priority_queue.New[int](priority_queue.WithEmptyValue(-1)) + q.Push(1, 1) + q.Push(2, 5) + q.Push(3, 4) + + assert.Equal(t, 2, q.Pop()) + assert.Equal(t, 3, q.Pop()) + assert.Equal(t, 1, q.Pop()) + assert.Equal(t, -1, q.Pop()) + assert.Equal(t, -1, q.Pop()) +}