From 048ea45c196950b9d35e468aa7faa705ef219b44 Mon Sep 17 00:00:00 2001 From: Lukasz Zajaczkowski Date: Thu, 19 Dec 2024 11:31:29 +0100 Subject: [PATCH] get node and control plane costs --- cmd/agent/args/args.go | 5 + cmd/agent/kubernetes.go | 2 + cmd/agent/main.go | 2 +- go.mod | 2 +- go.sum | 4 +- .../kubecostextractor_controller.go | 131 ++++++++++++++---- 6 files changed, 117 insertions(+), 29 deletions(-) diff --git a/cmd/agent/args/args.go b/cmd/agent/args/args.go index c33f7f86..55bf387f 100644 --- a/cmd/agent/args/args.go +++ b/cmd/agent/args/args.go @@ -57,6 +57,7 @@ var ( argLocal = flag.Bool("local", false, "Whether you're running the operator locally.") argProfiler = flag.Bool("profiler", false, "Enable pprof handler. By default it will be exposed on localhost:7777 under '/debug/pprof'") argDisableResourceCache = flag.Bool("disable-resource-cache", false, "Control whether resource cache should be enabled or not.") + argEnableKubecostProxy = flag.Bool("enable-kubecost-proxy", false, "If set, will proxy a Kubecost API request through the K8s API server.") argMaxConcurrentReconciles = flag.Int("max-concurrent-reconciles", 20, "Maximum number of concurrent reconciles which can be run.") argResyncSeconds = flag.Int("resync-seconds", 300, "Resync duration in seconds.") @@ -112,6 +113,10 @@ func DisableHelmTemplateDryRunServer() bool { return *argDisableHelmTemplateDryRunServer } +func EnableKubecostProxy() bool { + return *argEnableKubecostProxy +} + func EnableHelmDependencyUpdate() bool { return *argEnableHelmDependencyUpdate } diff --git a/cmd/agent/kubernetes.go b/cmd/agent/kubernetes.go index 8c8a5d3f..2225322f 100644 --- a/cmd/agent/kubernetes.go +++ b/cmd/agent/kubernetes.go @@ -102,6 +102,7 @@ func registerKubeReconcilersOrDie( config *rest.Config, extConsoleClient consoleclient.Client, discoveryClient discovery.DiscoveryInterface, + enableKubecostProxy bool, ) { rolloutsClient, dynamicClient, kubeClient, metricsClient := initKubeClientsOrDie(config) @@ -250,6 +251,7 @@ func registerKubeReconcilersOrDie( KubeClient: kubeClient, ExtConsoleClient: extConsoleClient, Tasks: cmap.New[context.CancelFunc](), + Proxy: enableKubecostProxy, }).SetupWithManager(manager); err != nil { setupLog.Error(err, "unable to create controller", "controller", "MetricsAggregate") } diff --git a/cmd/agent/main.go b/cmd/agent/main.go index 13479046..1247700d 100644 --- a/cmd/agent/main.go +++ b/cmd/agent/main.go @@ -61,7 +61,7 @@ func main() { cache.InitGateCache(args.ControllerCacheTTL(), extConsoleClient) registerConsoleReconcilersOrDie(consoleManager, config, kubeManager.GetClient(), kubeManager.GetScheme(), extConsoleClient) - registerKubeReconcilersOrDie(ctx, kubeManager, consoleManager, config, extConsoleClient, discoveryClient) + registerKubeReconcilersOrDie(ctx, kubeManager, consoleManager, config, extConsoleClient, discoveryClient, args.EnableKubecostProxy()) //+kubebuilder:scaffold:builder diff --git a/go.mod b/go.mod index c064f61a..c2c4b913 100644 --- a/go.mod +++ b/go.mod @@ -33,7 +33,7 @@ require ( github.com/opencost/opencost/core v0.0.0-20241216191657-30e5d9a27f41 github.com/orcaman/concurrent-map/v2 v2.0.1 github.com/pkg/errors v0.9.1 - github.com/pluralsh/console/go/client v1.25.2 + github.com/pluralsh/console/go/client v1.25.3 github.com/pluralsh/controller-reconcile-helper v0.1.0 github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 github.com/pluralsh/polly v0.1.10 diff --git a/go.sum b/go.sum index 70191a21..3cdeb436 100644 --- a/go.sum +++ b/go.sum @@ -1190,8 +1190,8 @@ github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c/go.mod h1:7rwL4CYBLnjL github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pluralsh/console/go/client v1.25.2 h1:Ha/ZF5t+ilJ0MVZPeDO46tK7HyKmi74c1DIHPA2sDY0= -github.com/pluralsh/console/go/client v1.25.2/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k= +github.com/pluralsh/console/go/client v1.25.3 h1:6MvNz0AuxGwH+zWQyXakMCKf/UG8YfalZBLED9pWLoU= +github.com/pluralsh/console/go/client v1.25.3/go.mod h1:lpoWASYsM9keNePS3dpFiEisUHEfObIVlSL3tzpKn8k= github.com/pluralsh/controller-reconcile-helper v0.1.0 h1:BV3dYZFH5rn8ZvZjtpkACSv/GmLEtRftNQj/Y4ddHEo= github.com/pluralsh/controller-reconcile-helper v0.1.0/go.mod h1:RxAbvSB4/jkvx616krCdNQXPbpGJXW3J1L3rASxeFOA= github.com/pluralsh/gophoenix v0.1.3-0.20231201014135-dff1b4309e34 h1:ab2PN+6if/Aq3/sJM0AVdy1SYuMAnq4g20VaKhTm/Bw= diff --git a/internal/controller/kubecostextractor_controller.go b/internal/controller/kubecostextractor_controller.go index d32cf168..223a2eb5 100644 --- a/internal/controller/kubecostextractor_controller.go +++ b/internal/controller/kubecostextractor_controller.go @@ -22,6 +22,7 @@ import ( "encoding/json" "fmt" "io" + "math/rand" "net/http" "net/url" "strconv" @@ -50,6 +51,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) +const kubeCostJitter = time.Minute * 5 + var kubecostResourceTypes = []string{"deployment", "statefulset", "daemonset"} // KubecostExtractorReconciler reconciles a KubecostExtractor object @@ -59,6 +62,7 @@ type KubecostExtractorReconciler struct { KubeClient kubernetes.Interface ExtConsoleClient consoleclient.Client Tasks cmap.ConcurrentMap[string, context.CancelFunc] + Proxy bool } func (r *KubecostExtractorReconciler) RunOnInterval(ctx context.Context, key string, interval time.Duration, condition wait.ConditionWithContextFunc) { @@ -69,7 +73,7 @@ func (r *KubecostExtractorReconciler) RunOnInterval(ctx context.Context, key str r.Tasks.Set(key, cancel) go func() { - _ = wait.PollUntilContextCancel(ctxCancel, interval, true, condition) + _ = wait.PollUntilContextCancel(ctxCancel, interval+time.Duration(rand.Int63n(int64(kubeCostJitter))), true, condition) }() } @@ -134,24 +138,21 @@ func (r *KubecostExtractorReconciler) Reconcile(ctx context.Context, req ctrl.Re reterr = err } }() - clusterCostAttr, err := r.getClusterCost(ctx, kubecostService, kubecost.Spec.GetPort(), kubecost.Spec.GetInterval()) + clusterCostAttr, err := r.getClusterCost(ctx, kubecostService, kubecost.Spec.GetPort()) if err != nil { logger.Error(err, "Unable to fetch cluster cost") utils.MarkCondition(kubecost.SetCondition, v1alpha1.ReadyConditionType, v1.ConditionFalse, v1alpha1.ErrorConditionReason, err.Error()) - return false, nil } - namespacesCostAtrr, err := r.getNamespacesCost(ctx, kubecostService, kubecost.Spec.GetPort(), kubecost.Spec.GetInterval()) + namespacesCostAtrr, err := r.getNamespacesCost(ctx, kubecostService, kubecost.Spec.GetPort()) if err != nil { logger.Error(err, "Unable to fetch namespacesCostAtrr cost") utils.MarkCondition(kubecost.SetCondition, v1alpha1.ReadyConditionType, v1.ConditionFalse, v1alpha1.ErrorConditionReason, err.Error()) - return false, nil } - recommendations, err := r.getRecommendationAttributes(ctx, kubecostService, kubecost.Spec.GetPort(), kubecost.Spec.GetInterval(), recommendationThreshold) + recommendations, err := r.getRecommendationAttributes(ctx, kubecostService, kubecost.Spec.GetPort(), recommendationThreshold) if err != nil { logger.Error(err, "Unable to fetch recommendations") - // utils.MarkCondition(kubecost.SetCondition, v1alpha1.ReadyConditionType, v1.ConditionFalse, v1alpha1.ErrorConditionReason, err.Error()) - // return false, nil + utils.MarkCondition(kubecost.SetCondition, v1alpha1.ReadyConditionType, v1.ConditionFalse, v1alpha1.ErrorConditionReason, err.Error()) } // nothing for specified time window @@ -193,8 +194,8 @@ func (r *KubecostExtractorReconciler) fetch(host, path string, params map[string ResponseHeaderTimeout: 120 * time.Second, } - client := &http.Client{Transport: tr} - resp, err := client.Get(fmt.Sprintf("http://%s%s%s", host, path, query)) + httpClient := &http.Client{Transport: tr} + resp, err := httpClient.Get(fmt.Sprintf("http://%s%s%s", host, path, query)) if err != nil { return nil, err } @@ -216,18 +217,25 @@ func (r *KubecostExtractorReconciler) getAllocation(ctx context.Context, srv *co "accumulate": "true", } - bytes, err := r.fetch(fmt.Sprintf("%s.%s:%s", srv.Name, srv.Namespace, servicePort), "/model/allocation", queryParams) + var response []byte + var err error + if r.Proxy { + response, err = r.KubeClient.CoreV1().Services(srv.Namespace).ProxyGet("", srv.Name, servicePort, "/model/allocation", queryParams).DoRaw(ctx) + } else { + response, err = r.fetch(fmt.Sprintf("%s.%s:%s", srv.Name, srv.Namespace, servicePort), "/model/allocation", queryParams) + + } if err != nil { return nil, err } ar := &allocationResponse{} - if err = json.Unmarshal(bytes, ar); err != nil { + if err = json.Unmarshal(response, ar); err != nil { return nil, err } return ar, nil } -func (r *KubecostExtractorReconciler) getRecommendationAttributes(ctx context.Context, srv *corev1.Service, servicePort string, interval time.Duration, recommendationThreshold float64) ([]*console.ClusterRecommendationAttributes, error) { +func (r *KubecostExtractorReconciler) getRecommendationAttributes(ctx context.Context, srv *corev1.Service, servicePort string, recommendationThreshold float64) ([]*console.ClusterRecommendationAttributes, error) { var result []*console.ClusterRecommendationAttributes for _, resourceType := range kubecostResourceTypes { ar, err := r.getAllocation(ctx, srv, servicePort, resourceType) @@ -256,7 +264,7 @@ func (r *KubecostExtractorReconciler) getRecommendationAttributes(ctx context.Co return result, nil } -func (r *KubecostExtractorReconciler) getNamespacesCost(ctx context.Context, srv *corev1.Service, servicePort string, interval time.Duration) ([]*console.CostAttributes, error) { +func (r *KubecostExtractorReconciler) getNamespacesCost(ctx context.Context, srv *corev1.Service, servicePort string) ([]*console.CostAttributes, error) { var result []*console.CostAttributes ar, err := r.getAllocation(ctx, srv, servicePort, "namespace") if err != nil { @@ -265,15 +273,15 @@ func (r *KubecostExtractorReconciler) getNamespacesCost(ctx context.Context, srv if ar.Code != http.StatusOK { return nil, fmt.Errorf("unexpected status code: %d", ar.Code) } - for _, clusterCosts := range ar.Data { - if clusterCosts == nil { + for _, namespaceCosts := range ar.Data { + if namespaceCosts == nil { continue } - for namespace, allocation := range clusterCosts { + for namespace, allocation := range namespaceCosts { if namespace == opencost.IdleSuffix { continue } - attr := convertCostAttributes(allocation) + attr := convertCostAttributes(allocation, nil, nil) attr.Namespace = lo.ToPtr(namespace) result = append(result, attr) } @@ -282,17 +290,20 @@ func (r *KubecostExtractorReconciler) getNamespacesCost(ctx context.Context, srv return result, nil } -func (r *KubecostExtractorReconciler) getClusterCost(ctx context.Context, srv *corev1.Service, servicePort string, interval time.Duration) (*console.CostAttributes, error) { - bytes, err := r.fetch(fmt.Sprintf("%s.%s:%s", srv.Name, srv.Namespace, servicePort), "/model/clusterInfo", nil) +func (r *KubecostExtractorReconciler) getClusterCost(ctx context.Context, srv *corev1.Service, servicePort string) (*console.CostAttributes, error) { + controlPlaneCost, err := r.getControlPlaneCost(ctx, srv, servicePort) if err != nil { return nil, err } - var resp clusterinfoResponse - err = json.Unmarshal(bytes, &resp) + nodeCost, err := r.getNodeCost(ctx, srv, servicePort) if err != nil { return nil, err } + clusterID, err := r.getClusterID(ctx, srv, servicePort) + if err != nil { + return nil, err + } ar, err := r.getAllocation(ctx, srv, servicePort, "cluster") if err != nil { return nil, err @@ -304,15 +315,83 @@ func (r *KubecostExtractorReconciler) getClusterCost(ctx context.Context, srv *c if clusterCosts == nil { continue } - clusterCostAllocation, ok := clusterCosts[resp.Data.ClusterID] + + allocation, ok := clusterCosts[clusterID] if ok { - return convertCostAttributes(clusterCostAllocation), nil + return convertCostAttributes(allocation, nodeCost, controlPlaneCost), nil } } return nil, nil } +func (r *KubecostExtractorReconciler) getControlPlaneCost(ctx context.Context, srv *corev1.Service, servicePort string) (*float64, error) { + ar, err := r.getAllocation(ctx, srv, servicePort, "controller") + if err != nil { + return nil, err + } + if ar.Code != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", ar.Code) + } + for _, controllerCosts := range ar.Data { + if controllerCosts == nil { + continue + } + allocation, ok := controllerCosts[opencost.UnallocatedSuffix] + if ok { + return lo.ToPtr(allocation.TotalCost()), nil + } + } + + return nil, nil +} + +func (r *KubecostExtractorReconciler) getNodeCost(ctx context.Context, srv *corev1.Service, servicePort string) (*float64, error) { + var totalNodeCost float64 + ar, err := r.getAllocation(ctx, srv, servicePort, "node") + if err != nil { + return nil, err + } + if ar.Code != http.StatusOK { + return nil, fmt.Errorf("unexpected status code: %d", ar.Code) + } + for _, nodeCosts := range ar.Data { + if nodeCosts == nil { + continue + } + for name, allocation := range nodeCosts { + if name == opencost.IdleSuffix { + continue + } + totalNodeCost += allocation.TotalCost() + } + } + if totalNodeCost > 0 { + return &totalNodeCost, nil + } + return nil, nil +} + +func (r *KubecostExtractorReconciler) getClusterID(ctx context.Context, srv *corev1.Service, servicePort string) (string, error) { + var response []byte + var err error + if r.Proxy { + response, err = r.KubeClient.CoreV1().Services(srv.Namespace).ProxyGet("", srv.Name, servicePort, "/model/clusterInfo", nil).DoRaw(ctx) + } else { + response, err = r.fetch(fmt.Sprintf("%s.%s:%s", srv.Name, srv.Namespace, servicePort), "/model/clusterInfo", nil) + + } + if err != nil { + return "", err + } + var resp clusterinfoResponse + err = json.Unmarshal(response, &resp) + if err != nil { + return "", err + } + return resp.Data.ClusterID, nil +} + func (r *KubecostExtractorReconciler) getObjectInfo(ctx context.Context, resourceType console.ScalingRecommendationType, namespace, name string) (container, serviceId *string, err error) { gvk := schema.GroupVersionKind{ Group: "apps", @@ -394,7 +473,7 @@ func (r *KubecostExtractorReconciler) convertClusterRecommendationAttributes(ctx return result } -func convertCostAttributes(allocation opencost.Allocation) *console.CostAttributes { +func convertCostAttributes(allocation opencost.Allocation, nodeCost, controlPlaneCost *float64) *console.CostAttributes { attr := &console.CostAttributes{ Memory: lo.ToPtr(allocation.RAMBytes()), CPU: lo.ToPtr(allocation.CPUCores()), @@ -405,6 +484,8 @@ func convertCostAttributes(allocation opencost.Allocation) *console.CostAttribut MemoryCost: lo.ToPtr(allocation.RAMCost), GpuCost: lo.ToPtr(allocation.GPUCost), LoadBalancerCost: lo.ToPtr(allocation.LoadBalancerCost), + ControlPlaneCost: controlPlaneCost, + NodeCost: nodeCost, } if allocation.GPUAllocation != nil { attr.GpuUtil = allocation.GPUAllocation.GPUUsageAverage