Skip to content

Commit

Permalink
feat: Add AI insight component scraper (#307)
Browse files Browse the repository at this point in the history
* cluster ai insight component scraper

* improve fetching components

* add Node status
  • Loading branch information
zreigz authored Oct 30, 2024
1 parent a007826 commit 77118e5
Show file tree
Hide file tree
Showing 6 changed files with 248 additions and 4 deletions.
4 changes: 4 additions & 0 deletions cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pluralsh/deployment-operator/pkg/cache"
"github.com/pluralsh/deployment-operator/pkg/client"
consolectrl "github.com/pluralsh/deployment-operator/pkg/controller"
"github.com/pluralsh/deployment-operator/pkg/scraper"
velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -70,6 +71,9 @@ func main() {
// Start the discovery cache in background.
cache.RunDiscoveryCacheInBackgroundOrDie(ctx, discoveryClient)

// Start AI insight component scraper in background
scraper.RunAiInsightComponentScraperInBackgroundOrDie(ctx, kubeManager.GetClient())

// Start the console manager in background.
runConsoleManagerInBackgroundOrDie(ctx, consoleManager)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ require (
github.com/open-policy-agent/gatekeeper/v3 v3.17.1
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/pkg/errors v0.9.1
github.com/pluralsh/console/go/client v1.22.1
github.com/pluralsh/console/go/client v1.22.2
github.com/pluralsh/controller-reconcile-helper v0.1.0
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34
github.com/pluralsh/polly v0.1.10
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -666,8 +666,8 @@ github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rK
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pluralsh/console/go/client v1.22.1 h1:kC6QEiOkUnOGyJFugsPejzMNlM+4VgfJR1oST9Pxfkg=
github.com/pluralsh/console/go/client v1.22.1/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k=
github.com/pluralsh/console/go/client v1.22.2 h1:mOWPel/oZ9vypJvF136/CWIl6P2Rj89uarHvoRJDnHI=
github.com/pluralsh/console/go/client v1.22.2/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k=
github.com/pluralsh/controller-reconcile-helper v0.1.0 h1:BV3dYZFH5rn8ZvZjtpkACSv/GmLEtRftNQj/Y4ddHEo=
github.com/pluralsh/controller-reconcile-helper v0.1.0/go.mod h1:RxAbvSB4/jkvx616krCdNQXPbpGJXW3J1L3rASxeFOA=
github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw=
Expand Down
39 changes: 39 additions & 0 deletions pkg/common/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ const (
PersistentVolumeClaimKind = "PersistentVolumeClaim"
CustomResourceDefinitionKind = "CustomResourceDefinition"
PodKind = "Pod"
NodeKind = "Node"
APIServiceKind = "APIService"
NamespaceKind = "Namespace"
HorizontalPodAutoscalerKind = "HorizontalPodAutoscaler"
Expand Down Expand Up @@ -369,6 +370,42 @@ func getBatchv1JobHealth(job *batchv1.Job) (*HealthStatus, error) {

}

func getNodeHealth(obj *unstructured.Unstructured) (*HealthStatus, error) {
gvk := obj.GroupVersionKind()
switch gvk {
case corev1.SchemeGroupVersion.WithKind(NodeKind):
var node corev1.Node
err := runtime.DefaultUnstructuredConverter.FromUnstructured(obj.Object, &node)
if err != nil {
return nil, fmt.Errorf("failed to convert unstructured Node to typed: %w", err)
}
return getCorev1NodeHealth(&node)
default:
return nil, fmt.Errorf("unsupported Node GVK: %s", gvk)
}
}

func getCorev1NodeHealth(node *corev1.Node) (*HealthStatus, error) {
for _, condition := range node.Status.Conditions {
if condition.Type == corev1.NodeReady {
if condition.Status == corev1.ConditionTrue {
return &HealthStatus{
Status: HealthStatusHealthy,
Message: condition.Message,
}, nil
}
return &HealthStatus{
Status: HealthStatusDegraded,
Message: condition.Message,
}, nil
}
}

return &HealthStatus{
Status: HealthStatusUnknown,
}, nil
}

func getPodHealth(obj *unstructured.Unstructured) (*HealthStatus, error) {
gvk := obj.GroupVersionKind()
switch gvk {
Expand Down Expand Up @@ -823,6 +860,8 @@ func GetHealthCheckFuncByGroupVersionKind(gvk schema.GroupVersionKind) func(obj
return getPVCHealth
case PodKind:
return getPodHealth
case NodeKind:
return getNodeHealth
}
case "batch":
if gvk.Kind == JobKind {
Expand Down
28 changes: 27 additions & 1 deletion pkg/ping/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,41 @@ import (
"strings"

console "github.com/pluralsh/console/go/client"
"github.com/pluralsh/deployment-operator/pkg/scraper"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/version"
"k8s.io/klog/v2"
)

func pingAttributes(info *version.Info, pods []string, minKubeletVersion *string) console.ClusterPing {
vs := strings.Split(info.GitVersion, "-")
return console.ClusterPing{
cp := console.ClusterPing{
CurrentVersion: strings.TrimPrefix(vs[0], "v"),
Distro: lo.ToPtr(findDistro(append(pods, info.GitVersion))),
KubeletVersion: minKubeletVersion,
}
if scraper.GetAiInsightComponents().IsFresh() && scraper.GetAiInsightComponents().Len() > 0 {
klog.Info("found ", scraper.GetAiInsightComponents().Len(), " fresh AI insight components")

scraper.GetAiInsightComponents().SetFresh(false)
insightComponents := make([]*console.ClusterInsightComponentAttributes, 0)

for _, value := range scraper.GetAiInsightComponents().GetItems() {
attr := &console.ClusterInsightComponentAttributes{
Version: value.Gvk.Version,
Kind: value.Gvk.Kind,
Name: value.Name,
}
if len(value.Gvk.Group) > 0 {
attr.Group = lo.ToPtr(value.Gvk.Group)
}
if len(value.Namespace) > 0 {
attr.Namespace = lo.ToPtr(value.Namespace)
}
insightComponents = append(insightComponents, attr)
}
cp.InsightComponents = insightComponents
}

return cp
}
175 changes: 175 additions & 0 deletions pkg/scraper/component_scraper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package scraper

import (
"context"
"fmt"
"sync"
"time"

corev1 "k8s.io/api/core/v1"

"github.com/pluralsh/deployment-operator/internal/helpers"
"github.com/pluralsh/deployment-operator/pkg/common"
agentcommon "github.com/pluralsh/deployment-operator/pkg/common"
common2 "github.com/pluralsh/deployment-operator/pkg/controller/common"
"github.com/pluralsh/polly/algorithms"
"github.com/pluralsh/polly/containers"
"github.com/samber/lo"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
ctrclient "sigs.k8s.io/controller-runtime/pkg/client"
)

const (
name = "Ai Insight Component Scraper"
nodeKind = "Node"
)

var (
aiInsightComponents *AiInsightComponents
)

func init() {
aiInsightComponents = &AiInsightComponents{
items: containers.NewSet[Component](),
}
}

type AiInsightComponents struct {
mu sync.RWMutex
items containers.Set[Component]
fresh bool
}

type Component struct {
Gvk schema.GroupVersionKind
Name string
Namespace string
}

func (c Component) String() string {
return fmt.Sprintf("GVK=%s, Name=%s, Namespace=%s", c.Gvk.String(), c.Name, c.Namespace)
}

func GetAiInsightComponents() *AiInsightComponents {
return aiInsightComponents
}

func (s *AiInsightComponents) Clear() {
s.mu.Lock()
defer s.mu.Unlock()
s.items = containers.NewSet[Component]()
}

func (s *AiInsightComponents) AddItem(c Component) {
s.mu.Lock()
defer s.mu.Unlock()
s.items.Add(c)
}

func (s *AiInsightComponents) GetItems() []Component {
s.mu.RLock()
defer s.mu.RUnlock()
return s.items.List()
}

func (s *AiInsightComponents) Len() int {
s.mu.RLock()
defer s.mu.RUnlock()
return s.items.Len()
}

func (s *AiInsightComponents) IsFresh() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.fresh
}

func (s *AiInsightComponents) SetFresh(f bool) {
s.mu.Lock()
defer s.mu.Unlock()
s.fresh = f
}

func RunAiInsightComponentScraperInBackgroundOrDie(ctx context.Context, k8sClient ctrclient.Client) {
klog.Info("starting ", name)
scrapeResources := []schema.GroupVersionKind{
{Group: "apps", Version: "v1", Kind: "Deployment"},
{Group: "apps", Version: "v1", Kind: "DaemonSet"},
{Group: "apps", Version: "v1", Kind: "StatefulSet"},
{Group: "", Version: "v1", Kind: nodeKind},
}

err := helpers.BackgroundPollUntilContextCancel(ctx, 15*time.Minute, true, true, func(_ context.Context) (done bool, err error) {
GetAiInsightComponents().Clear()

for _, gvk := range scrapeResources {
if err := setUnhealthyComponents(ctx, k8sClient, gvk); err != nil {
klog.Error(err, "can't set update component status")
}
}
GetAiInsightComponents().SetFresh(true)

return false, nil
})
if err != nil {
panic(fmt.Errorf("failed to start %s in background: %w", name, err))
}
}

func setUnhealthyComponents(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind, opts ...ctrclient.ListOption) error {
pager := listResources(ctx, k8sClient, gvk)
for pager.HasNext() {
items, err := pager.NextPage()
if err != nil {
return err
}
for _, item := range items {
health, err := common.GetResourceHealth(&item)
if err != nil {
return err
}
if health.Status == common.HealthStatusDegraded {
GetAiInsightComponents().AddItem(Component{
Gvk: gvk,
Name: item.GetName(),
Namespace: item.GetNamespace(),
})
}
}
}
return nil
}

func listResources(ctx context.Context, k8sClient ctrclient.Client, gvk schema.GroupVersionKind) *algorithms.Pager[unstructured.Unstructured] {
var opts []ctrclient.ListOption
manageByOperatorLabels := map[string]string{
agentcommon.ManagedByLabel: agentcommon.AgentLabelValue,
}
ml := ctrclient.MatchingLabels(manageByOperatorLabels)
if gvk != corev1.SchemeGroupVersion.WithKind(nodeKind) {
opts = append(opts, ml)
}

fetch := func(page *string, size int64) ([]unstructured.Unstructured, *algorithms.PageInfo, error) {
list := &unstructured.UnstructuredList{}
list.SetGroupVersionKind(gvk)

if page != nil {
opts = append(opts, ctrclient.Continue(*page))
}
opts = append(opts, ctrclient.Limit(size))
// List resources
if err := k8sClient.List(ctx, list, opts...); err != nil {
return nil, nil, fmt.Errorf("failed to list resources: %w", err)
}
pageInfo := &algorithms.PageInfo{
HasNext: list.GetContinue() != "",
After: lo.ToPtr(list.GetContinue()),
PageSize: size,
}
return list.Items, pageInfo, nil
}
return algorithms.NewPager[unstructured.Unstructured](common2.DefaultPageSize, fetch)
}

0 comments on commit 77118e5

Please sign in to comment.