diff --git a/cmd/agent/main.go b/cmd/agent/main.go index a2e02eb9..8b870b66 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -13,6 +13,7 @@ import ( "github.com/pluralsh/deployment-operator/pkg/cache" "github.com/pluralsh/deployment-operator/pkg/client" consolectrl "github.com/pluralsh/deployment-operator/pkg/controller" + "github.com/pluralsh/deployment-operator/pkg/scraper" velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" "k8s.io/apimachinery/pkg/runtime" @@ -70,6 +71,9 @@ func main() { // Start the discovery cache in background. cache.RunDiscoveryCacheInBackgroundOrDie(ctx, discoveryClient) + // Start AI insight component scraper in background + scraper.RunAiInsightComponentScraperInBackgroundOrDie(ctx, kubeManager.GetClient()) + // Start the console manager in background. runConsoleManagerInBackgroundOrDie(ctx, consoleManager) diff --git a/go.mod b/go.mod index 38ba67cf..331b0427 100644 --- a/go.mod +++ b/go.mod @@ -28,7 +28,7 @@ require ( github.com/open-policy-agent/gatekeeper/v3 v3.17.1 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 - github.com/pluralsh/console/go/client v1.22.1 + github.com/pluralsh/console/go/client v1.22.2 github.com/pluralsh/controller-reconcile-helper v0.1.0 github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 github.com/pluralsh/polly v0.1.10 diff --git a/go.sum b/go.sum index 0fb29c90..8aafd1ee 100644 --- a/go.sum +++ b/go.sum @@ -666,8 +666,8 @@ github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rK github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pluralsh/console/go/client v1.22.1 h1:kC6QEiOkUnOGyJFugsPejzMNlM+4VgfJR1oST9Pxfkg= -github.com/pluralsh/console/go/client v1.22.1/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k= +github.com/pluralsh/console/go/client v1.22.2 h1:mOWPel/oZ9vypJvF136/CWIl6P2Rj89uarHvoRJDnHI= +github.com/pluralsh/console/go/client v1.22.2/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k= github.com/pluralsh/controller-reconcile-helper v0.1.0 h1:BV3dYZFH5rn8ZvZjtpkACSv/GmLEtRftNQj/Y4ddHEo= github.com/pluralsh/controller-reconcile-helper v0.1.0/go.mod h1:RxAbvSB4/jkvx616krCdNQXPbpGJXW3J1L3rASxeFOA= github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw= diff --git a/pkg/common/health.go b/pkg/common/health.go index 8a303aa2..9e5b1dc0 100644 --- a/pkg/common/health.go +++ b/pkg/common/health.go @@ -58,6 +58,7 @@ const ( PersistentVolumeClaimKind = "PersistentVolumeClaim" CustomResourceDefinitionKind = "CustomResourceDefinition" PodKind = "Pod" + NodeKind = "Node" APIServiceKind = "APIService" NamespaceKind = "Namespace" HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler" @@ -369,6 +370,42 @@ func getBatchv1JobHealth(job *batchv1.Job) (*HealthStatus, error) { } +func getNodeHealth(obj *unstructured.Unstructured) (*HealthStatus, error) { + gvk := obj.GroupVersionKind() + switch gvk { + case corev1.SchemeGroupVersion.WithKind(NodeKind): + var node corev1.Node + err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &node) + if err != nil { + return nil, fmt.Errorf("failed to convert unstructured Node to typed: %w", err) + } + return getCorev1NodeHealth(&node) + default: + return nil, fmt.Errorf("unsupported Node GVK: %s", gvk) + } +} + +func getCorev1NodeHealth(node *corev1.Node) (*HealthStatus, error) { + for _, condition := range node.Status.Conditions { + if condition.Type == corev1.NodeReady { + if condition.Status == corev1.ConditionTrue { + return &HealthStatus{ + Status: HealthStatusHealthy, + Message: condition.Message, + }, nil + } + return &HealthStatus{ + Status: HealthStatusDegraded, + Message: condition.Message, + }, nil + } + } + + return &HealthStatus{ + Status: HealthStatusUnknown, + }, nil +} + func getPodHealth(obj *unstructured.Unstructured) (*HealthStatus, error) { gvk := obj.GroupVersionKind() switch gvk { @@ -823,6 +860,8 @@ func GetHealthCheckFuncByGroupVersionKind(gvk schema.GroupVersionKind) func(obj return getPVCHealth case PodKind: return getPodHealth + case NodeKind: + return getNodeHealth } case "batch": if gvk.Kind == JobKind { diff --git a/pkg/ping/build.go b/pkg/ping/build.go index d7d6224a..15446b8c 100644 --- a/pkg/ping/build.go +++ b/pkg/ping/build.go @@ -4,15 +4,41 @@ import ( "strings" console "github.com/pluralsh/console/go/client" + "github.com/pluralsh/deployment-operator/pkg/scraper" "github.com/samber/lo" "k8s.io/apimachinery/pkg/version" + "k8s.io/klog/v2" ) func pingAttributes(info *version.Info, pods []string, minKubeletVersion *string) console.ClusterPing { vs := strings.Split(info.GitVersion, "-") - return console.ClusterPing{ + cp := console.ClusterPing{ CurrentVersion: strings.TrimPrefix(vs[0], "v"), Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))), KubeletVersion: minKubeletVersion, } + if scraper.GetAiInsightComponents().IsFresh() && scraper.GetAiInsightComponents().Len() > 0 { + klog.Info("found ", scraper.GetAiInsightComponents().Len(), " fresh AI insight components") + + scraper.GetAiInsightComponents().SetFresh(false) + insightComponents := make([]*console.ClusterInsightComponentAttributes, 0) + + for _, value := range scraper.GetAiInsightComponents().GetItems() { + attr := &console.ClusterInsightComponentAttributes{ + Version: value.Gvk.Version, + Kind: value.Gvk.Kind, + Name: value.Name, + } + if len(value.Gvk.Group) > 0 { + attr.Group = lo.ToPtr(value.Gvk.Group) + } + if len(value.Namespace) > 0 { + attr.Namespace = lo.ToPtr(value.Namespace) + } + insightComponents = append(insightComponents, attr) + } + cp.InsightComponents = insightComponents + } + + return cp } diff --git a/pkg/scraper/component_scraper.go b/pkg/scraper/component_scraper.go new file mode 100644 index 00000000..bea55528 --- /dev/null +++ b/pkg/scraper/component_scraper.go @@ -0,0 +1,175 @@ +package scraper + +import ( + "context" + "fmt" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + + "github.com/pluralsh/deployment-operator/internal/helpers" + "github.com/pluralsh/deployment-operator/pkg/common" + agentcommon "github.com/pluralsh/deployment-operator/pkg/common" + common2 "github.com/pluralsh/deployment-operator/pkg/controller/common" + "github.com/pluralsh/polly/algorithms" + "github.com/pluralsh/polly/containers" + "github.com/samber/lo" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" + ctrclient "sigs.k8s.io/controller-runtime/pkg/client" +) + +const ( + name = "Ai Insight Component Scraper" + nodeKind = "Node" +) + +var ( + aiInsightComponents *AiInsightComponents +) + +func init() { + aiInsightComponents = &AiInsightComponents{ + items: containers.NewSet[Component](), + } +} + +type AiInsightComponents struct { + mu sync.RWMutex + items containers.Set[Component] + fresh bool +} + +type Component struct { + Gvk schema.GroupVersionKind + Name string + Namespace string +} + +func (c Component) String() string { + return fmt.Sprintf("GVK=%s, Name=%s, Namespace=%s", c.Gvk.String(), c.Name, c.Namespace) +} + +func GetAiInsightComponents() *AiInsightComponents { + return aiInsightComponents +} + +func (s *AiInsightComponents) Clear() { + s.mu.Lock() + defer s.mu.Unlock() + s.items = containers.NewSet[Component]() +} + +func (s *AiInsightComponents) AddItem(c Component) { + s.mu.Lock() + defer s.mu.Unlock() + s.items.Add(c) +} + +func (s *AiInsightComponents) GetItems() []Component { + s.mu.RLock() + defer s.mu.RUnlock() + return s.items.List() +} + +func (s *AiInsightComponents) Len() int { + s.mu.RLock() + defer s.mu.RUnlock() + return s.items.Len() +} + +func (s *AiInsightComponents) IsFresh() bool { + s.mu.RLock() + defer s.mu.RUnlock() + return s.fresh +} + +func (s *AiInsightComponents) SetFresh(f bool) { + s.mu.Lock() + defer s.mu.Unlock() + s.fresh = f +} + +func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClient ctrclient.Client) { + klog.Info("starting ", name) + scrapeResources := []schema.GroupVersionKind{ + {Group: "apps", Version: "v1", Kind: "Deployment"}, + {Group: "apps", Version: "v1", Kind: "DaemonSet"}, + {Group: "apps", Version: "v1", Kind: "StatefulSet"}, + {Group: "", Version: "v1", Kind: nodeKind}, + } + + err := helpers.BackgroundPollUntilContextCancel(ctx, 15*time.Minute, true, true, func(_ context.Context) (done bool, err error) { + GetAiInsightComponents().Clear() + + for _, gvk := range scrapeResources { + if err := setUnhealthyComponents(ctx, k8sClient, gvk); err != nil { + klog.Error(err, "can't set update component status") + } + } + GetAiInsightComponents().SetFresh(true) + + return false, nil + }) + if err != nil { + panic(fmt.Errorf("failed to start %s in background: %w", name, err)) + } +} + +func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error { + pager := listResources(ctx, k8sClient, gvk) + for pager.HasNext() { + items, err := pager.NextPage() + if err != nil { + return err + } + for _, item := range items { + health, err := common.GetResourceHealth(&item) + if err != nil { + return err + } + if health.Status == common.HealthStatusDegraded { + GetAiInsightComponents().AddItem(Component{ + Gvk: gvk, + Name: item.GetName(), + Namespace: item.GetNamespace(), + }) + } + } + } + return nil +} + +func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind) *algorithms.Pager[unstructured.Unstructured] { + var opts []ctrclient.ListOption + manageByOperatorLabels := map[string]string{ + agentcommon.ManagedByLabel: agentcommon.AgentLabelValue, + } + ml := ctrclient.MatchingLabels(manageByOperatorLabels) + if gvk != corev1.SchemeGroupVersion.WithKind(nodeKind) { + opts = append(opts, ml) + } + + fetch := func(page *string, size int64) ([]unstructured.Unstructured, *algorithms.PageInfo, error) { + list := &unstructured.UnstructuredList{} + list.SetGroupVersionKind(gvk) + + if page != nil { + opts = append(opts, ctrclient.Continue(*page)) + } + opts = append(opts, ctrclient.Limit(size)) + // List resources + if err := k8sClient.List(ctx, list, opts...); err != nil { + return nil, nil, fmt.Errorf("failed to list resources: %w", err) + } + pageInfo := &algorithms.PageInfo{ + HasNext: list.GetContinue() != "", + After: lo.ToPtr(list.GetContinue()), + PageSize: size, + } + return list.Items, pageInfo, nil + } + return algorithms.NewPager[unstructured.Unstructured](common2.DefaultPageSize, fetch) +}