Skip to content

Commit

Permalink
improve fetching components
Browse files Browse the repository at this point in the history
  • Loading branch information
zreigz committed Oct 29, 2024
1 parent c9d9bb8 commit cfbcd7f
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 49 deletions.
54 changes: 27 additions & 27 deletions pkg/ping/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,42 +3,42 @@ package ping
import (
"strings"

"github.com/pluralsh/deployment-operator/pkg/scraper"

console "github.com/pluralsh/console/go/client"
"github.com/pluralsh/deployment-operator/pkg/scraper"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/version"
"k8s.io/klog/v2"
)

func pingAttributes(info *version.Info, pods []string, minKubeletVersion *string) console.ClusterPing {
vs := strings.Split(info.GitVersion, "-")
var insightComponents []*console.ClusterInsightComponentAttributes
if !scraper.AiInsightComponents.IsEmpty() {
insightComponents = make([]*console.ClusterInsightComponentAttributes, 0)
for value, fresh := range scraper.AiInsightComponents.Items() {
if fresh {
attr := &console.ClusterInsightComponentAttributes{
Version: value.Gvk.Version,
Kind: value.Gvk.Kind,
Name: value.Name,
}
if len(value.Gvk.Group) > 0 {
attr.Group = lo.ToPtr(value.Gvk.Group)
}
if len(value.Namespace) > 0 {
attr.Namespace = lo.ToPtr(value.Namespace)
}
insightComponents = append(insightComponents, attr)
// set fresh state to false
scraper.AiInsightComponents.Set(value, false)
cp := console.ClusterPing{
CurrentVersion: strings.TrimPrefix(vs[0], "v"),
Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))),
KubeletVersion: minKubeletVersion,
}
if scraper.GetAiInsightComponents().IsFresh() && scraper.GetAiInsightComponents().Len() > 0 {
klog.Info("found ", scraper.GetAiInsightComponents().Len(), " fresh AI insight components")

scraper.GetAiInsightComponents().SetFresh(false)
insightComponents := make([]*console.ClusterInsightComponentAttributes, 0)

for _, value := range scraper.GetAiInsightComponents().GetItems() {
attr := &console.ClusterInsightComponentAttributes{
Version: value.Gvk.Version,
Kind: value.Gvk.Kind,
Name: value.Name,
}
if len(value.Gvk.Group) > 0 {
attr.Group = lo.ToPtr(value.Gvk.Group)
}
if len(value.Namespace) > 0 {
attr.Namespace = lo.ToPtr(value.Namespace)
}
insightComponents = append(insightComponents, attr)
}
cp.InsightComponents = insightComponents
}

return console.ClusterPing{
CurrentVersion: strings.TrimPrefix(vs[0], "v"),
Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))),
KubeletVersion: minKubeletVersion,
InsightComponents: insightComponents,
}
return cp
}
117 changes: 95 additions & 22 deletions pkg/scraper/component_scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package scraper
import (
"context"
"fmt"
"sync"
"time"

cmap "github.com/orcaman/concurrent-map/v2"
"github.com/pluralsh/deployment-operator/internal/helpers"
"github.com/pluralsh/deployment-operator/pkg/common"
agentcommon "github.com/pluralsh/deployment-operator/pkg/common"
common2 "github.com/pluralsh/deployment-operator/pkg/controller/common"
"github.com/pluralsh/polly/algorithms"
"github.com/pluralsh/polly/containers"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
Expand All @@ -18,11 +22,19 @@ import (
const name = "Ai Insight Component Scraper"

var (
AiInsightComponents cmap.ConcurrentMap[Component, bool]
aiInsightComponents *AiInsightComponents
)

func init() {
AiInsightComponents = cmap.NewStringer[Component, bool]()
aiInsightComponents = &AiInsightComponents{
items: containers.NewSet[Component](),
}
}

type AiInsightComponents struct {
mu sync.RWMutex
items containers.Set[Component]
fresh bool
}

type Component struct {
Expand All @@ -35,6 +47,46 @@ func (c Component) String() string {
return fmt.Sprintf("GVK=%s, Name=%s, Namespace=%s", c.Gvk.String(), c.Name, c.Namespace)
}

func GetAiInsightComponents() *AiInsightComponents {
return aiInsightComponents
}

func (s *AiInsightComponents) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.items = containers.NewSet[Component]()
}

func (s *AiInsightComponents) AddItem(c Component) {
s.mu.Lock()
defer s.mu.Unlock()
s.items.Add(c)
}

func (s *AiInsightComponents) GetItems() []Component {
s.mu.RLock()
defer s.mu.RUnlock()
return s.items.List()
}

func (s *AiInsightComponents) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.items.Len()
}

func (s *AiInsightComponents) IsFresh() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.fresh
}

func (s *AiInsightComponents) SetFresh(f bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.fresh = f
}

func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClient ctrclient.Client) {
klog.Info("starting ", name)
scrapeResources := []schema.GroupVersionKind{
Expand All @@ -48,13 +100,14 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien
}

err := helpers.BackgroundPollUntilContextCancel(ctx, 15*time.Minute, true, true, func(_ context.Context) (done bool, err error) {
AiInsightComponents.Clear()
GetAiInsightComponents().Clear()

for _, gvk := range scrapeResources {
if err := setUnhealthyComponents(ctx, k8sClient, gvk, ctrclient.MatchingLabels(manageByOperatorLabels)); err != nil {
klog.Error(err, "can't set update component status")
}
}
GetAiInsightComponents().SetFresh(true)

return false, nil
})
Expand All @@ -64,28 +117,48 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien
}

func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error {
// Create an unstructured list with the desired GVK
list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)

// List resources
if err := k8sClient.List(ctx, list, opts...); err != nil {
return fmt.Errorf("failed to list resources: %w", err)
}

// Iterate over each unstructured object
for _, item := range list.Items {
health, err := common.GetResourceHealth(&item)
pager := listResources(ctx, k8sClient, gvk, opts...)
for pager.HasNext() {
items, err := pager.NextPage()
if err != nil {
return err
}
if health.Status == common.HealthStatusDegraded {
AiInsightComponents.Set(Component{
Gvk: gvk,
Name: item.GetName(),
Namespace: item.GetNamespace(),
}, true)
for _, item := range items {
health, err := common.GetResourceHealth(&item)
if err != nil {
return err
}
if health.Status == common.HealthStatusDegraded {
GetAiInsightComponents().AddItem(Component{
Gvk: gvk,
Name: item.GetName(),
Namespace: item.GetNamespace(),
})
}
}
}
return nil
}

func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) *algorithms.Pager[unstructured.Unstructured] {
fetch := func(page *string, size int64) ([]unstructured.Unstructured, *algorithms.PageInfo, error) {
list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)

if page != nil {
opts = append(opts, ctrclient.Continue(*page))
}
opts = append(opts, ctrclient.Limit(size))
// List resources
if err := k8sClient.List(ctx, list, opts...); err != nil {
return nil, nil, fmt.Errorf("failed to list resources: %w", err)
}
pageInfo := &algorithms.PageInfo{
HasNext: list.GetContinue() != "",
After: lo.ToPtr(list.GetContinue()),
PageSize: size,
}
return list.Items, pageInfo, nil
}
return algorithms.NewPager[unstructured.Unstructured](common2.DefaultPageSize, fetch)
}

0 comments on commit cfbcd7f

Please sign in to comment.