Skip to content

Commit

Permalink
chore: use priority queue from commons
Browse files Browse the repository at this point in the history
  • Loading branch information
adityathebe authored and moshloop committed Dec 12, 2024
1 parent afab95c commit bb2b67c
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 52 deletions.
11 changes: 2 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ require (
github.com/aws/aws-sdk-go-v2/service/sts v1.30.5
github.com/aws/aws-sdk-go-v2/service/support v1.24.3
github.com/aws/smithy-go v1.22.1
github.com/emirpasic/gods v1.18.1
github.com/evanphx/json-patch v5.7.0+incompatible
github.com/flanksource/artifacts v1.0.14
github.com/flanksource/commons v1.32.1
github.com/flanksource/commons v1.34.0
github.com/flanksource/duty v1.0.769
github.com/flanksource/is-healthy v1.0.53
github.com/flanksource/ketall v1.1.7
Expand Down Expand Up @@ -117,10 +116,8 @@ require (
github.com/asecurityteam/rolling v2.0.4+incompatible // indirect
github.com/aws/aws-sdk-go-v2/config v1.27.29 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.17.29 // indirect
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bmatcuk/doublestar/v4 v4.7.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/cert-manager/cert-manager v1.9.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -129,6 +126,7 @@ require (
github.com/distribution/reference v0.5.0 // indirect
github.com/eko/gocache/lib/v4 v4.1.6 // indirect
github.com/eko/gocache/store/go_cache/v4 v4.2.2 // indirect
github.com/emirpasic/gods v1.18.1 // indirect
github.com/emirpasic/gods/v2 v2.0.0-alpha // indirect
github.com/evanphx/json-patch/v5 v5.7.0 // indirect
github.com/exaring/otelpgx v0.6.2 // indirect
Expand Down Expand Up @@ -162,7 +160,6 @@ require (
github.com/hashicorp/hcl/v2 v2.21.0 // indirect
github.com/henvic/httpretty v0.1.3 // indirect
github.com/hirochachacha/go-smb2 v1.1.0 // indirect
github.com/invopop/jsonschema v0.12.0 // indirect
github.com/itchyny/gojq v0.12.16 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 // indirect
Expand Down Expand Up @@ -207,11 +204,7 @@ require (
github.com/vadimi/go-http-ntlm v1.0.3 // indirect
github.com/vadimi/go-http-ntlm/v2 v2.4.1 // indirect
github.com/vadimi/go-ntlm v1.2.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
github.com/xanzy/ssh-agent v0.3.3 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f // indirect
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
github.com/xeipuuv/gojsonschema v1.2.0 // indirect
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 // indirect
github.com/yuin/gopher-lua v1.1.1 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
Expand Down
18 changes: 2 additions & 16 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -833,8 +833,6 @@ github.com/aws/aws-sdk-go-v2/service/support v1.24.3 h1:Bbesu6YZvEYACyydELMwUTYY
github.com/aws/aws-sdk-go-v2/service/support v1.24.3/go.mod h1:NvXUhACskXZ2tiXzECpC/97xKzyY7/Wcc1ug5rla7kY=
github.com/aws/smithy-go v1.22.1 h1:/HPHZQ0g7f4eUeK6HKglFz8uwVfZKgoI25rb/J+dnro=
github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand All @@ -853,8 +851,6 @@ github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl
github.com/boombuler/barcode v1.0.1/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/bwesterb/go-ristretto v1.2.3/go.mod h1:fUIoIZaG73pV5biE2Blr2xEzDoMj7NFEuV9ekS419A0=
github.com/cactus/go-statsd-client/statsd v0.0.0-20200423205355-cb0885a1018c/go.mod h1:l/bIBLeOl9eX+wxJAzxS4TveKRtAqlyDpHjhkfO0MEI=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
Expand Down Expand Up @@ -972,8 +968,8 @@ github.com/fergusstrange/embedded-postgres v1.25.0 h1:sa+k2Ycrtz40eCRPOzI7Ry7Ttk
github.com/fergusstrange/embedded-postgres v1.25.0/go.mod h1:t/MLs0h9ukYM6FSt99R7InCHs1nW0ordoVCcnzmpTYw=
github.com/flanksource/artifacts v1.0.14 h1:Vv70bccsae0MwGaf/uSPp34J5V1/PyKfct9z5JYCTJU=
github.com/flanksource/artifacts v1.0.14/go.mod h1:qHVCnQu5k50aWNJ5UhpcAKEl7pAzqUrFFKGSm147G70=
github.com/flanksource/commons v1.32.1 h1:380cRHbbjGoxHO8DbiqTrBz7XYw/qUjDIwKDcCNZqmU=
github.com/flanksource/commons v1.32.1/go.mod h1:gO/d401JGFx10+6/9V+PXiGAAVUwxrcLgkke1tGyyNU=
github.com/flanksource/commons v1.34.0 h1:cT9VYWMJDE/KSoPa71UUr1pl764MWBVI1PmrhFSc7B8=
github.com/flanksource/commons v1.34.0/go.mod h1:gO/d401JGFx10+6/9V+PXiGAAVUwxrcLgkke1tGyyNU=
github.com/flanksource/duty v1.0.769 h1:5EOBis382RhFLNkP2hVhYFAnXUdxRXm3GdCUgjYi+hY=
github.com/flanksource/duty v1.0.769/go.mod h1:sZY2NytdenrkqXoMD6Gn2C8xH6dm5HsqOeE0p74Z2VE=
github.com/flanksource/gomplate/v3 v3.20.4/go.mod h1:27BNWhzzSjDed1z8YShO6W+z6G9oZXuxfNFGd/iGSdc=
Expand Down Expand Up @@ -1320,8 +1316,6 @@ github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+h
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/invopop/jsonschema v0.12.0 h1:6ovsNSuvn9wEQVOyc72aycBMVQFKz7cPdMJn10CvzRI=
github.com/invopop/jsonschema v0.12.0/go.mod h1:ffZ5Km5SWWRAIN6wbDXItl95euhFz2uON45H2qjYt+0=
github.com/itchyny/gojq v0.12.13/go.mod h1:JzwzAqenfhrPUuwbmEz3nu3JQmFLlQTQMUcOdnu/Sf4=
github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g=
github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM=
Expand Down Expand Up @@ -1766,16 +1760,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/xanzy/ssh-agent v0.3.3 h1:+/15pJfg/RsTxqYcX6fHqOXZwwMP+2VyYWJeWM2qQFM=
github.com/xanzy/ssh-agent v0.3.3/go.mod h1:6dzNDKs0J9rVPHPhaGCukekBHKqfl+L3KghI1Bc68Uw=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f h1:J9EGpcZtP0E/raorCMxlFGSTBrsSlaDGf3jU/qvAE2c=
github.com/xeipuuv/gojsonpointer v0.0.0-20180127040702-4e3ac2762d5f/go.mod h1:N2zxlSyiKSe5eX1tZViRH5QA0qijqEDrYZiPEAiq3wU=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHovont7NscjpAxXsDA8S8BMYve8Y5+7cuRE7R0=
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xhit/go-str2duration v1.2.0/go.mod h1:3cPSlfZlUHVlneIVfePFWcJZsuwf+P1v2SRTV4cUmp4=
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
github.com/xi2/xz v0.0.0-20171230120015-48954b6210f8 h1:nIPpBwaJSVYIxUFsDv3M8ofmx9yWTog9BfvIu0q41lo=
Expand Down
10 changes: 3 additions & 7 deletions scrapers/incremental.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"

pq "github.com/emirpasic/gods/queues/priorityqueue"
"github.com/flanksource/commons/collections"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/db"
Expand All @@ -32,7 +32,7 @@ func consumeKubernetesWatchJobKey(id string) string {

// ConsumeKubernetesWatchJobFunc returns a job that consumes kubernetes objects received from shared informers
// for the given config of the scrapeconfig.
func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, queue *pq.Queue) *job.Job {
func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, queue *collections.Queue[*kubernetes.QueueItem]) *job.Job {
return &job.Job{
Name: "ConsumeKubernetesWatch",
Context: sc.DutyContext().WithObject(sc.ScrapeConfig().ObjectMeta),
Expand Down Expand Up @@ -64,7 +64,7 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
)

for {
val, more := queue.Dequeue()
queueItem, more := queue.Dequeue()
if !more {
break
}
Expand All @@ -75,10 +75,6 @@ func ConsumeKubernetesWatchJobFunc(sc api.ScrapeContext, config v1.Kubernetes, q
break
}

queueItem, ok := val.(*kubernetes.QueueItem)
if !ok {
return fmt.Errorf("unexpected item in the priority queue: %T", val)
}
obj := queueItem.Obj
queuedTime[string(obj.GetUID())] = queueItem.Timestamp

Expand Down
40 changes: 26 additions & 14 deletions scrapers/kubernetes/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"sync"
"time"

pq "github.com/emirpasic/gods/queues/priorityqueue"
"github.com/flanksource/commons/collections"
"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
Expand All @@ -34,10 +34,24 @@ var (
)

// WatchResources watches Kubernetes resources with shared informers
func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) (*pq.Queue, error) {
priorityQueue := pq.NewWith(pqComparator)
func WatchResources(ctx api.ScrapeContext, config v1.Kubernetes) (*collections.Queue[*QueueItem], error) {
priorityQueue, err := collections.NewQueue(collections.QueueOpts[*QueueItem]{
Metrics: collections.MetricsOpts[*QueueItem]{
Name: "shared_informer",
Labels: map[string]any{
"scraper_id": ctx.ScraperID(),
},
},
Comparator: pqComparator,
Equals: queueItemIsEqual,
Dedupe: true,
})
if err != nil {
return nil, fmt.Errorf("failed to create queue: %w", err)
}

if loaded, ok := WatchQueue.LoadOrStore(config.Hash(), priorityQueue); ok {
priorityQueue = loaded.(*pq.Queue)
priorityQueue = loaded.(*collections.Queue[*QueueItem])
}

if config.Kubeconfig != nil {
Expand Down Expand Up @@ -86,7 +100,7 @@ type SharedInformerManager struct {

type DeleteObjHandler func(ctx context.Context, id string) error

func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *pq.Queue) error {
func (t *SharedInformerManager) Register(ctx api.ScrapeContext, watchResource v1.KubernetesResourceToWatch, queue *collections.Queue[*QueueItem]) error {
registrationTime := time.Now()

apiVersion, kind := watchResource.ApiVersion, watchResource.Kind
Expand Down Expand Up @@ -385,21 +399,19 @@ func NewQueueItem(obj *unstructured.Unstructured, operation QueueItemOperation)
}
}

func pqComparator(a, b any) int {
if a == nil || b == nil {
return 0
}

qa := a.(*QueueItem)
qb := b.(*QueueItem)
func queueItemIsEqual(qa, qb *QueueItem) bool {
return qa.Obj.GetUID() == qb.Obj.GetUID()
}

func pqComparator(qa, qb *QueueItem) int {
if qa.Obj.GetUID() == qb.Obj.GetUID() {
resourceVersionA, ok, _ := unstructured.NestedString(qa.Obj.Object, "metadata", "resourceVersion")
if ok {
resourceVersionB, _, _ := unstructured.NestedString(qb.Obj.Object, "metadata", "resourceVersion")

// Because of the way we are deduping, we want the earlier version in front of the queue.
return strings.Compare(resourceVersionA, resourceVersionB)
// Because of the way we are deduping, we want the latest version in front of the queue.
// the later versions are discarded.
return strings.Compare(resourceVersionB, resourceVersionA)
}
}

Expand Down
22 changes: 16 additions & 6 deletions scrapers/kubernetes/informers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ import (
"testing"
"time"

"github.com/emirpasic/gods/queues/priorityqueue"
"github.com/flanksource/commons/collections"
"github.com/flanksource/commons/hash"
"github.com/google/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
Expand Down Expand Up @@ -166,27 +167,35 @@ func TestPqComparator(t *testing.T) {
Obj: getUnstructuredWithResourceVersion("Pod", "a", "2c6a2f24-0199-435d-83a6-bd3f6d18d06d", "4", now.Add(-1*time.Hour)),
},
},
expected: []string{"a-1", "a-2", "a-3", "a-4"},
expected: []string{"a-4"},
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
q := priorityqueue.NewWith(pqComparator)
q, err := collections.NewQueue(collections.QueueOpts[*QueueItem]{
Metrics: collections.MetricsOpts[*QueueItem]{
Name: fmt.Sprintf("m_%s", hash.Sha256Hex(tt.name)[:10]),
},
Comparator: pqComparator,
Equals: queueItemIsEqual,
Dedupe: true,
})
if err != nil {
t.Fatalf("failed to create queue: %v", err)
}

for _, item := range tt.Items {
q.Enqueue(&item)
}

var result []string
for {
v, ok := q.Dequeue()
item, ok := q.Dequeue()
if !ok {
break
}

item := v.(*QueueItem)

resourceVersion, ok, _ := unstructured.NestedString(item.Obj.Object, "metadata", "resourceVersion")
if ok {
result = append(result, fmt.Sprintf("%s-%s", item.Obj.GetName(), resourceVersion))
Expand All @@ -207,6 +216,7 @@ func getUnstructuredEvent(kind, name string, creationTimestamp, recreationTimest
Object: map[string]any{
"kind": kind,
"metadata": map[string]any{
"uid": uuid.NewString(),
"name": name,
"creationTimestamp": creationTimestamp.Format(time.RFC3339),
"managedFields": []any{
Expand Down

0 comments on commit bb2b67c

Please sign in to comment.