From c9d9bb88e28ff71dba9e153b1583e26df46490e0 Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Mon, 28 Oct 2024 14:47:35 +0100 Subject: [PATCH 1/3] cluster ai insight component scraper --- cmd/agent/main.go | 4 ++ go.mod | 2 +- go.sum | 4 +- pkg/ping/build.go | 32 +++++++++-- pkg/scraper/component_scraper.go | 91 ++++++++++++++++++++++++++++++++ 5 files changed, 127 insertions(+), 6 deletions(-) create mode 100644 pkg/scraper/component_scraper.go diff --git a/cmd/agent/main.go b/cmd/agent/main.go index a2e02eb9..8b870b66 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -13,6 +13,7 @@ import ( "github.com/pluralsh/deployment-operator/pkg/cache" "github.com/pluralsh/deployment-operator/pkg/client" consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/scraper" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" @@ -70,6 +71,9 @@ func main() { // Start the discovery cache in background. cache.RunDiscoveryCacheInBackgroundOrDie(ctx, discoveryClient) + // Start AI insight component scraper in background + scraper.RunAiInsightComponentScraperInBackgroundOrDie(ctx, kubeManager.GetClient()) + // Start the console manager in background. runConsoleManagerInBackgroundOrDie(ctx, consoleManager) diff --git a/go.mod b/go.mod index 38ba67cf..331b0427 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/open-policy-agent/gatekeeper/v3 v3.17.1 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 - github.com/pluralsh/console/go/client v1.22.1 + github.com/pluralsh/console/go/client v1.22.2 github.com/pluralsh/controller-reconcile-helper v0.1.0 github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 github.com/pluralsh/polly v0.1.10 diff --git a/go.sum b/go.sum index 0fb29c90..8aafd1ee 100644 --- a/go.sum +++ b/go.sum @@ -666,8 +666,8 @@ github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rK github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pluralsh/console/go/client v1.22.1 h1:kC6QEiOkUnOGyJFugsPejzMNlM+4VgfJR1oST9Pxfkg= -github.com/pluralsh/console/go/client v1.22.1/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k= +github.com/pluralsh/console/go/client v1.22.2 h1:mOWPel/oZ9vypJvF136/CWIl6P2Rj89uarHvoRJDnHI= +github.com/pluralsh/console/go/client v1.22.2/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k= github.com/pluralsh/controller-reconcile-helper v0.1.0 h1:BV3dYZFH5rn8ZvZjtpkACSv/GmLEtRftNQj/Y4ddHEo= github.com/pluralsh/controller-reconcile-helper v0.1.0/go.mod h1:RxAbvSB4/jkvx616krCdNQXPbpGJXW3J1L3rASxeFOA= github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw= diff --git a/pkg/ping/build.go b/pkg/ping/build.go index d7d6224a..34d0f8ff 100644 --- a/pkg/ping/build.go +++ b/pkg/ping/build.go @@ -3,6 +3,8 @@ package ping import ( "strings" + "github.com/pluralsh/deployment-operator/pkg/scraper" + console "github.com/pluralsh/console/go/client" "github.com/samber/lo" "k8s.io/apimachinery/pkg/version" @@ -10,9 +12,33 @@ import ( func pingAttributes(info *version.Info, pods []string, minKubeletVersion *string) console.ClusterPing { vs := strings.Split(info.GitVersion, "-") + var insightComponents []*console.ClusterInsightComponentAttributes + if !scraper.AiInsightComponents.IsEmpty() { + insightComponents = make([]*console.ClusterInsightComponentAttributes, 0) + for value, fresh := range scraper.AiInsightComponents.Items() { + if fresh { + attr := &console.ClusterInsightComponentAttributes{ + Version: value.Gvk.Version, + Kind: value.Gvk.Kind, + Name: value.Name, + } + if len(value.Gvk.Group) > 0 { + attr.Group = lo.ToPtr(value.Gvk.Group) + } + if len(value.Namespace) > 0 { + attr.Namespace = lo.ToPtr(value.Namespace) + } + insightComponents = append(insightComponents, attr) + // set fresh state to false + scraper.AiInsightComponents.Set(value, false) + } + } + } + return console.ClusterPing{ - CurrentVersion: strings.TrimPrefix(vs[0], "v"), - Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), - KubeletVersion: minKubeletVersion, + CurrentVersion: strings.TrimPrefix(vs[0], "v"), + Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), + KubeletVersion: minKubeletVersion, + InsightComponents: insightComponents, } } diff --git a/pkg/scraper/component_scraper.go b/pkg/scraper/component_scraper.go new file mode 100644 index 00000000..426eb237 --- /dev/null +++ b/pkg/scraper/component_scraper.go @@ -0,0 +1,91 @@ +package scraper + +import ( + "context" + "fmt" + "time" + + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/pluralsh/deployment-operator/internal/helpers" + "github.com/pluralsh/deployment-operator/pkg/common" + agentcommon "github.com/pluralsh/deployment-operator/pkg/common" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" + ctrclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const name = "Ai Insight Component Scraper" + +var ( + AiInsightComponents cmap.ConcurrentMap[Component, bool] +) + +func init() { + AiInsightComponents = cmap.NewStringer[Component, bool]() +} + +type Component struct { + Gvk schema.GroupVersionKind + Name string + Namespace string +} + +func (c Component) String() string { + return fmt.Sprintf("GVK=%s, Name=%s, Namespace=%s", c.Gvk.String(), c.Name, c.Namespace) +} + +func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClient ctrclient.Client) { + klog.Info("starting ", name) + scrapeResources := []schema.GroupVersionKind{ + {Group: "apps", Version: "v1", Kind: "Deployment"}, + {Group: "apps", Version: "v1", Kind: "DaemonSet"}, + {Group: "apps", Version: "v1", Kind: "StatefulSet"}, + } + + manageByOperatorLabels := map[string]string{ + agentcommon.ManagedByLabel: agentcommon.AgentLabelValue, + } + + err := helpers.BackgroundPollUntilContextCancel(ctx, 15*time.Minute, true, true, func(_ context.Context) (done bool, err error) { + AiInsightComponents.Clear() + + for _, gvk := range scrapeResources { + if err := setUnhealthyComponents(ctx, k8sClient, gvk, ctrclient.MatchingLabels(manageByOperatorLabels)); err != nil { + klog.Error(err, "can't set update component status") + } + } + + return false, nil + }) + if err != nil { + panic(fmt.Errorf("failed to start %s in background: %w", name, err)) + } +} + +func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error { + // Create an unstructured list with the desired GVK + list := &unstructured.UnstructuredList{} + list.SetGroupVersionKind(gvk) + + // List resources + if err := k8sClient.List(ctx, list, opts...); err != nil { + return fmt.Errorf("failed to list resources: %w", err) + } + + // Iterate over each unstructured object + for _, item := range list.Items { + health, err := common.GetResourceHealth(&item) + if err != nil { + return err + } + if health.Status == common.HealthStatusDegraded { + AiInsightComponents.Set(Component{ + Gvk: gvk, + Name: item.GetName(), + Namespace: item.GetNamespace(), + }, true) + } + } + return nil +} From cfbcd7f9c004ae864b360815ebec68514f9319ec Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Tue, 29 Oct 2024 09:56:10 +0100 Subject: [PATCH 2/3] improve fetching components --- pkg/ping/build.go | 54 +++++++------- pkg/scraper/component_scraper.go | 117 +++++++++++++++++++++++++------ 2 files changed, 122 insertions(+), 49 deletions(-) diff --git a/pkg/ping/build.go b/pkg/ping/build.go index 34d0f8ff..15446b8c 100644 --- a/pkg/ping/build.go +++ b/pkg/ping/build.go @@ -3,42 +3,42 @@ package ping import ( "strings" - "github.com/pluralsh/deployment-operator/pkg/scraper" - console "github.com/pluralsh/console/go/client" + "github.com/pluralsh/deployment-operator/pkg/scraper" "github.com/samber/lo" "k8s.io/apimachinery/pkg/version" + "k8s.io/klog/v2" ) func pingAttributes(info *version.Info, pods []string, minKubeletVersion *string) console.ClusterPing { vs := strings.Split(info.GitVersion, "-") - var insightComponents []*console.ClusterInsightComponentAttributes - if !scraper.AiInsightComponents.IsEmpty() { - insightComponents = make([]*console.ClusterInsightComponentAttributes, 0) - for value, fresh := range scraper.AiInsightComponents.Items() { - if fresh { - attr := &console.ClusterInsightComponentAttributes{ - Version: value.Gvk.Version, - Kind: value.Gvk.Kind, - Name: value.Name, - } - if len(value.Gvk.Group) > 0 { - attr.Group = lo.ToPtr(value.Gvk.Group) - } - if len(value.Namespace) > 0 { - attr.Namespace = lo.ToPtr(value.Namespace) - } - insightComponents = append(insightComponents, attr) - // set fresh state to false - scraper.AiInsightComponents.Set(value, false) + cp := console.ClusterPing{ + CurrentVersion: strings.TrimPrefix(vs[0], "v"), + Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), + KubeletVersion: minKubeletVersion, + } + if scraper.GetAiInsightComponents().IsFresh() && scraper.GetAiInsightComponents().Len() > 0 { + klog.Info("found ", scraper.GetAiInsightComponents().Len(), " fresh AI insight components") + + scraper.GetAiInsightComponents().SetFresh(false) + insightComponents := make([]*console.ClusterInsightComponentAttributes, 0) + + for _, value := range scraper.GetAiInsightComponents().GetItems() { + attr := &console.ClusterInsightComponentAttributes{ + Version: value.Gvk.Version, + Kind: value.Gvk.Kind, + Name: value.Name, + } + if len(value.Gvk.Group) > 0 { + attr.Group = lo.ToPtr(value.Gvk.Group) } + if len(value.Namespace) > 0 { + attr.Namespace = lo.ToPtr(value.Namespace) + } + insightComponents = append(insightComponents, attr) } + cp.InsightComponents = insightComponents } - return console.ClusterPing{ - CurrentVersion: strings.TrimPrefix(vs[0], "v"), - Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), - KubeletVersion: minKubeletVersion, - InsightComponents: insightComponents, - } + return cp } diff --git a/pkg/scraper/component_scraper.go b/pkg/scraper/component_scraper.go index 426eb237..d1c15426 100644 --- a/pkg/scraper/component_scraper.go +++ b/pkg/scraper/component_scraper.go @@ -3,12 +3,16 @@ package scraper import ( "context" "fmt" + "sync" "time" - cmap "github.com/orcaman/concurrent-map/v2" "github.com/pluralsh/deployment-operator/internal/helpers" "github.com/pluralsh/deployment-operator/pkg/common" agentcommon "github.com/pluralsh/deployment-operator/pkg/common" + common2 "github.com/pluralsh/deployment-operator/pkg/controller/common" + "github.com/pluralsh/polly/algorithms" + "github.com/pluralsh/polly/containers" + "github.com/samber/lo" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/klog/v2" @@ -18,11 +22,19 @@ import ( const name = "Ai Insight Component Scraper" var ( - AiInsightComponents cmap.ConcurrentMap[Component, bool] + aiInsightComponents *AiInsightComponents ) func init() { - AiInsightComponents = cmap.NewStringer[Component, bool]() + aiInsightComponents = &AiInsightComponents{ + items: containers.NewSet[Component](), + } +} + +type AiInsightComponents struct { + mu sync.RWMutex + items containers.Set[Component] + fresh bool } type Component struct { @@ -35,6 +47,46 @@ func (c Component) String() string { return fmt.Sprintf("GVK=%s, Name=%s, Namespace=%s", c.Gvk.String(), c.Name, c.Namespace) } +func GetAiInsightComponents() *AiInsightComponents { + return aiInsightComponents +} + +func (s *AiInsightComponents) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + s.items = containers.NewSet[Component]() +} + +func (s *AiInsightComponents) AddItem(c Component) { + s.mu.Lock() + defer s.mu.Unlock() + s.items.Add(c) +} + +func (s *AiInsightComponents) GetItems() []Component { + s.mu.RLock() + defer s.mu.RUnlock() + return s.items.List() +} + +func (s *AiInsightComponents) Len() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.items.Len() +} + +func (s *AiInsightComponents) IsFresh() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.fresh +} + +func (s *AiInsightComponents) SetFresh(f bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.fresh = f +} + func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClient ctrclient.Client) { klog.Info("starting ", name) scrapeResources := []schema.GroupVersionKind{ @@ -48,13 +100,14 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien } err := helpers.BackgroundPollUntilContextCancel(ctx, 15*time.Minute, true, true, func(_ context.Context) (done bool, err error) { - AiInsightComponents.Clear() + GetAiInsightComponents().Clear() for _, gvk := range scrapeResources { if err := setUnhealthyComponents(ctx, k8sClient, gvk, ctrclient.MatchingLabels(manageByOperatorLabels)); err != nil { klog.Error(err, "can't set update component status") } } + GetAiInsightComponents().SetFresh(true) return false, nil }) @@ -64,28 +117,48 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien } func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error { - // Create an unstructured list with the desired GVK - list := &unstructured.UnstructuredList{} - list.SetGroupVersionKind(gvk) - - // List resources - if err := k8sClient.List(ctx, list, opts...); err != nil { - return fmt.Errorf("failed to list resources: %w", err) - } - - // Iterate over each unstructured object - for _, item := range list.Items { - health, err := common.GetResourceHealth(&item) + pager := listResources(ctx, k8sClient, gvk, opts...) + for pager.HasNext() { + items, err := pager.NextPage() if err != nil { return err } - if health.Status == common.HealthStatusDegraded { - AiInsightComponents.Set(Component{ - Gvk: gvk, - Name: item.GetName(), - Namespace: item.GetNamespace(), - }, true) + for _, item := range items { + health, err := common.GetResourceHealth(&item) + if err != nil { + return err + } + if health.Status == common.HealthStatusDegraded { + GetAiInsightComponents().AddItem(Component{ + Gvk: gvk, + Name: item.GetName(), + Namespace: item.GetNamespace(), + }) + } } } return nil } + +func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) *algorithms.Pager[unstructured.Unstructured] { + fetch := func(page *string, size int64) ([]unstructured.Unstructured, *algorithms.PageInfo, error) { + list := &unstructured.UnstructuredList{} + list.SetGroupVersionKind(gvk) + + if page != nil { + opts = append(opts, ctrclient.Continue(*page)) + } + opts = append(opts, ctrclient.Limit(size)) + // List resources + if err := k8sClient.List(ctx, list, opts...); err != nil { + return nil, nil, fmt.Errorf("failed to list resources: %w", err) + } + pageInfo := &algorithms.PageInfo{ + HasNext: list.GetContinue() != "", + After: lo.ToPtr(list.GetContinue()), + PageSize: size, + } + return list.Items, pageInfo, nil + } + return algorithms.NewPager[unstructured.Unstructured](common2.DefaultPageSize, fetch) +} From d83e896c03f4c7f20f06ab62ee2c877909e66c9e Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Wed, 30 Oct 2024 11:46:11 +0100 Subject: [PATCH 3/3] add Node status --- pkg/common/health.go | 39 ++++++++++++++++++++++++++++++++ pkg/scraper/component_scraper.go | 27 +++++++++++++++------- 2 files changed, 58 insertions(+), 8 deletions(-) diff --git a/pkg/common/health.go b/pkg/common/health.go index 8a303aa2..9e5b1dc0 100644 --- a/pkg/common/health.go +++ b/pkg/common/health.go @@ -58,6 +58,7 @@ const ( PersistentVolumeClaimKind = "PersistentVolumeClaim" CustomResourceDefinitionKind = "CustomResourceDefinition" PodKind = "Pod" + NodeKind = "Node" APIServiceKind = "APIService" NamespaceKind = "Namespace" HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler" @@ -369,6 +370,42 @@ func getBatchv1JobHealth(job *batchv1.Job) (*HealthStatus, error) { } +func getNodeHealth(obj *unstructured.Unstructured) (*HealthStatus, error) { + gvk := obj.GroupVersionKind() + switch gvk { + case corev1.SchemeGroupVersion.WithKind(NodeKind): + var node corev1.Node + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &node) + if err != nil { + return nil, fmt.Errorf("failed to convert unstructured Node to typed: %w", err) + } + return getCorev1NodeHealth(&node) + default: + return nil, fmt.Errorf("unsupported Node GVK: %s", gvk) + } +} + +func getCorev1NodeHealth(node *corev1.Node) (*HealthStatus, error) { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + if condition.Status == corev1.ConditionTrue { + return &HealthStatus{ + Status: HealthStatusHealthy, + Message: condition.Message, + }, nil + } + return &HealthStatus{ + Status: HealthStatusDegraded, + Message: condition.Message, + }, nil + } + } + + return &HealthStatus{ + Status: HealthStatusUnknown, + }, nil +} + func getPodHealth(obj *unstructured.Unstructured) (*HealthStatus, error) { gvk := obj.GroupVersionKind() switch gvk { @@ -823,6 +860,8 @@ func GetHealthCheckFuncByGroupVersionKind(gvk schema.GroupVersionKind) func(obj return getPVCHealth case PodKind: return getPodHealth + case NodeKind: + return getNodeHealth } case "batch": if gvk.Kind == JobKind { diff --git a/pkg/scraper/component_scraper.go b/pkg/scraper/component_scraper.go index d1c15426..bea55528 100644 --- a/pkg/scraper/component_scraper.go +++ b/pkg/scraper/component_scraper.go @@ -6,6 +6,8 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + "github.com/pluralsh/deployment-operator/internal/helpers" "github.com/pluralsh/deployment-operator/pkg/common" agentcommon "github.com/pluralsh/deployment-operator/pkg/common" @@ -19,7 +21,10 @@ import ( ctrclient "sigs.k8s.io/controller-runtime/pkg/client" ) -const name = "Ai Insight Component Scraper" +const ( + name = "Ai Insight Component Scraper" + nodeKind = "Node" +) var ( aiInsightComponents *AiInsightComponents @@ -93,17 +98,14 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien {Group: "apps", Version: "v1", Kind: "Deployment"}, {Group: "apps", Version: "v1", Kind: "DaemonSet"}, {Group: "apps", Version: "v1", Kind: "StatefulSet"}, - } - - manageByOperatorLabels := map[string]string{ - agentcommon.ManagedByLabel: agentcommon.AgentLabelValue, + {Group: "", Version: "v1", Kind: nodeKind}, } err := helpers.BackgroundPollUntilContextCancel(ctx, 15*time.Minute, true, true, func(_ context.Context) (done bool, err error) { GetAiInsightComponents().Clear() for _, gvk := range scrapeResources { - if err := setUnhealthyComponents(ctx, k8sClient, gvk, ctrclient.MatchingLabels(manageByOperatorLabels)); err != nil { + if err := setUnhealthyComponents(ctx, k8sClient, gvk); err != nil { klog.Error(err, "can't set update component status") } } @@ -117,7 +119,7 @@ func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClien } func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error { - pager := listResources(ctx, k8sClient, gvk, opts...) + pager := listResources(ctx, k8sClient, gvk) for pager.HasNext() { items, err := pager.NextPage() if err != nil { @@ -140,7 +142,16 @@ func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk return nil } -func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) *algorithms.Pager[unstructured.Unstructured] { +func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind) *algorithms.Pager[unstructured.Unstructured] { + var opts []ctrclient.ListOption + manageByOperatorLabels := map[string]string{ + agentcommon.ManagedByLabel: agentcommon.AgentLabelValue, + } + ml := ctrclient.MatchingLabels(manageByOperatorLabels) + if gvk != corev1.SchemeGroupVersion.WithKind(nodeKind) { + opts = append(opts, ml) + } + fetch := func(page *string, size int64) ([]unstructured.Unstructured, *algorithms.PageInfo, error) { list := &unstructured.UnstructuredList{} list.SetGroupVersionKind(gvk)