diff --git a/pkg/ping/build.go b/pkg/ping/build.go index 34d0f8ff..15446b8c 100644 --- a/pkg/ping/build.go +++ b/pkg/ping/build.go @@ -3,42 +3,42 @@ package ping import ( "strings" - "github.com/pluralsh/deployment-operator/pkg/scraper" - console "github.com/pluralsh/console/go/client" + "github.com/pluralsh/deployment-operator/pkg/scraper" "github.com/samber/lo" "k8s.io/apimachinery/pkg/version" + "k8s.io/klog/v2" ) func pingAttributes(info *version.Info, pods []string, minKubeletVersion *string) console.ClusterPing { vs := strings.Split(info.GitVersion, "-") - var insightComponents []*console.ClusterInsightComponentAttributes - if !scraper.AiInsightComponents.IsEmpty() { - insightComponents = make([]*console.ClusterInsightComponentAttributes, 0) - for value, fresh := range scraper.AiInsightComponents.Items() { - if fresh { - attr := &console.ClusterInsightComponentAttributes{ - Version: value.Gvk.Version, - Kind: value.Gvk.Kind, - Name: value.Name, - } - if len(value.Gvk.Group) > 0 { - attr.Group = lo.ToPtr(value.Gvk.Group) - } - if len(value.Namespace) > 0 { - attr.Namespace = lo.ToPtr(value.Namespace) - } - insightComponents = append(insightComponents, attr) - // set fresh state to false - scraper.AiInsightComponents.Set(value, false) + cp := console.ClusterPing{ + CurrentVersion: strings.TrimPrefix(vs[0], "v"), + Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), + KubeletVersion: minKubeletVersion, + } + if scraper.GetAiInsightComponents().IsFresh() && scraper.GetAiInsightComponents().Len() > 0 { + klog.Info("found ", scraper.GetAiInsightComponents().Len(), " fresh AI insight components") + + scraper.GetAiInsightComponents().SetFresh(false) + insightComponents := make([]*console.ClusterInsightComponentAttributes, 0) + + for _, value := range scraper.GetAiInsightComponents().GetItems() { + attr := &console.ClusterInsightComponentAttributes{ + Version: value.Gvk.Version, + Kind: value.Gvk.Kind, + Name: value.Name, + } + if len(value.Gvk.Group) > 0 { + attr.Group = lo.ToPtr(value.Gvk.Group) } + if len(value.Namespace) > 0 { + attr.Namespace = lo.ToPtr(value.Namespace) + } + insightComponents = append(insightComponents, attr) } + cp.InsightComponents = insightComponents } - return console.ClusterPing{ - CurrentVersion: strings.TrimPrefix(vs[0], "v"), - Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), - KubeletVersion: minKubeletVersion, - InsightComponents: insightComponents, - } + return cp } diff --git a/pkg/scraper/component_scraper.go b/pkg/scraper/component_scraper.go index 426eb237..d1c15426 100644 --- a/pkg/scraper/component_scraper.go +++ b/pkg/scraper/component_scraper.go @@ -3,12 +3,16 @@ package scraper import ( "context" "fmt" + "sync" "time" - cmap "github.com/orcaman/concurrent-map/v2" "github.com/pluralsh/deployment-operator/internal/helpers" "github.com/pluralsh/deployment-operator/pkg/common" agentcommon "github.com/pluralsh/deployment-operator/pkg/common" + common2 "github.com/pluralsh/deployment-operator/pkg/controller/common" + "github.com/pluralsh/polly/algorithms" + "github.com/pluralsh/polly/containers" + "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" @@ -18,11 +22,19 @@ import ( const name = "Ai Insight Component Scraper" var ( - AiInsightComponents cmap.ConcurrentMap[Component, bool] + aiInsightComponents *AiInsightComponents ) func init() { - AiInsightComponents = cmap.NewStringer[Component, bool]() + aiInsightComponents = &AiInsightComponents{ + items: containers.NewSet[Component](), + } +} + +type AiInsightComponents struct { + mu sync.RWMutex + items containers.Set[Component] + fresh bool } type Component struct { @@ -35,6 +47,46 @@ func (c Component) String() string { return fmt.Sprintf("GVK=%s, Name=%s, Namespace=%s", c.Gvk.String(), c.Name, c.Namespace) } +func GetAiInsightComponents() *AiInsightComponents { + return aiInsightComponents +} + +func (s *AiInsightComponents) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + s.items = containers.NewSet[Component]() +} + +func (s *AiInsightComponents) AddItem(c Component) { + s.mu.Lock() + defer s.mu.Unlock() + s.items.Add(c) +} + +func (s *AiInsightComponents) GetItems() []Component { + s.mu.RLock() + defer s.mu.RUnlock() + return s.items.List() +} + +func (s *AiInsightComponents) Len() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.items.Len() +} + +func (s *AiInsightComponents) IsFresh() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.fresh +} + +func (s *AiInsightComponents) SetFresh(f bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.fresh = f +} + func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClient ctrclient.Client) { klog.Info("starting ", name) scrapeResources := []schema.GroupVersionKind{ @@ -48,13 +100,14 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien } err := helpers.BackgroundPollUntilContextCancel(ctx, 15*time.Minute, true, true, func(_ context.Context) (done bool, err error) { - AiInsightComponents.Clear() + GetAiInsightComponents().Clear() for _, gvk := range scrapeResources { if err := setUnhealthyComponents(ctx, k8sClient, gvk, ctrclient.MatchingLabels(manageByOperatorLabels)); err != nil { klog.Error(err, "can't set update component status") } } + GetAiInsightComponents().SetFresh(true) return false, nil }) @@ -64,28 +117,48 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien } func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error { - // Create an unstructured list with the desired GVK - list := &unstructured.UnstructuredList{} - list.SetGroupVersionKind(gvk) - - // List resources - if err := k8sClient.List(ctx, list, opts...); err != nil { - return fmt.Errorf("failed to list resources: %w", err) - } - - // Iterate over each unstructured object - for _, item := range list.Items { - health, err := common.GetResourceHealth(&item) + pager := listResources(ctx, k8sClient, gvk, opts...) + for pager.HasNext() { + items, err := pager.NextPage() if err != nil { return err } - if health.Status == common.HealthStatusDegraded { - AiInsightComponents.Set(Component{ - Gvk: gvk, - Name: item.GetName(), - Namespace: item.GetNamespace(), - }, true) + for _, item := range items { + health, err := common.GetResourceHealth(&item) + if err != nil { + return err + } + if health.Status == common.HealthStatusDegraded { + GetAiInsightComponents().AddItem(Component{ + Gvk: gvk, + Name: item.GetName(), + Namespace: item.GetNamespace(), + }) + } } } return nil } + +func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) *algorithms.Pager[unstructured.Unstructured] { + fetch := func(page *string, size int64) ([]unstructured.Unstructured, *algorithms.PageInfo, error) { + list := &unstructured.UnstructuredList{} + list.SetGroupVersionKind(gvk) + + if page != nil { + opts = append(opts, ctrclient.Continue(*page)) + } + opts = append(opts, ctrclient.Limit(size)) + // List resources + if err := k8sClient.List(ctx, list, opts...); err != nil { + return nil, nil, fmt.Errorf("failed to list resources: %w", err) + } + pageInfo := &algorithms.PageInfo{ + HasNext: list.GetContinue() != "", + After: lo.ToPtr(list.GetContinue()), + PageSize: size, + } + return list.Items, pageInfo, nil + } + return algorithms.NewPager[unstructured.Unstructured](common2.DefaultPageSize, fetch) +}