Skip to content

Commit

Permalink
node usage metrics not being collected is not a fatal error
Browse files Browse the repository at this point in the history
  • Loading branch information
laverya committed Oct 19, 2023
1 parent 57b7ffc commit 819a9eb
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 118 deletions.
74 changes: 40 additions & 34 deletions pkg/helmvm/helmvm_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,31 @@ func GetNode(ctx context.Context, client kubernetes.Interface, nodeName string)
return nil, fmt.Errorf("failed to create metrics client: %w", err)
}

nodePods, err := podsOnNode(ctx, client, nodeName)
return nodeMetrics(ctx, client, metricsClient, *node)
}

func podsOnNode(ctx context.Context, client kubernetes.Interface, nodeName string) ([]corev1.Pod, error) {
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("list namespaces: %w", err)
}

toReturn := []corev1.Pod{}

for _, ns := range namespaces.Items {
nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)})
if err != nil {
return nil, fmt.Errorf("list pods on %s in namespace %s: %w", nodeName, ns.Name, err)
}

toReturn = append(toReturn, nsPods.Items...)
}
return toReturn, nil
}

// nodeMetrics takes a corev1.Node and gets metrics + status for that node
func nodeMetrics(ctx context.Context, client kubernetes.Interface, metricsClient *metricsv.Clientset, node corev1.Node) (*types.Node, error) {
nodePods, err := podsOnNode(ctx, client, node.Name)
if err != nil {
return nil, fmt.Errorf("pods per node: %w", err)
}
Expand All @@ -49,17 +73,18 @@ func GetNode(ctx context.Context, client kubernetes.Interface, nodeName string)

podCapacity.Capacity = float64(node.Status.Capacity.Pods().Value())

nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("list pod metrics: %w", err)
}

if nodeMetrics.Usage.Memory() != nil {
memoryCapacity.Available = memoryCapacity.Capacity - float64(nodeMetrics.Usage.Memory().Value())/math.Pow(2, 30)
}
nodeUsageMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{})
if err == nil {
if nodeUsageMetrics.Usage.Memory() != nil {
memoryCapacity.Available = memoryCapacity.Capacity - float64(nodeUsageMetrics.Usage.Memory().Value())/math.Pow(2, 30)
}

if nodeMetrics.Usage.Cpu() != nil {
cpuCapacity.Available = cpuCapacity.Capacity - nodeMetrics.Usage.Cpu().AsApproximateFloat64()
if nodeUsageMetrics.Usage.Cpu() != nil {
cpuCapacity.Available = cpuCapacity.Capacity - nodeUsageMetrics.Usage.Cpu().AsApproximateFloat64()
}
} else {
// if we can't get metrics, we'll do nothing for now
// in the future we may decide to retry or log a warning
}

podCapacity.Available = podCapacity.Capacity - float64(len(nodePods))
Expand All @@ -71,10 +96,10 @@ func GetNode(ctx context.Context, client kubernetes.Interface, nodeName string)

return &types.Node{
Name: node.Name,
IsConnected: isConnected(*node),
IsReady: isReady(*node),
IsPrimaryNode: isPrimary(*node),
CanDelete: node.Spec.Unschedulable && !isConnected(*node),
IsConnected: isConnected(node),
IsReady: isReady(node),
IsPrimaryNode: isPrimary(node),
CanDelete: node.Spec.Unschedulable && !isConnected(node),
KubeletVersion: node.Status.NodeInfo.KubeletVersion,
CPU: cpuCapacity,
Memory: memoryCapacity,
Expand All @@ -84,22 +109,3 @@ func GetNode(ctx context.Context, client kubernetes.Interface, nodeName string)
PodList: nodePods,
}, nil
}

func podsOnNode(ctx context.Context, client kubernetes.Interface, nodeName string) ([]corev1.Pod, error) {
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("list namespaces: %w", err)
}

toReturn := []corev1.Pod{}

for _, ns := range namespaces.Items {
nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)})
if err != nil {
return nil, fmt.Errorf("list pods on %s in namespace %s: %w", nodeName, ns.Name, err)
}

toReturn = append(toReturn, nsPods.Items...)
}
return toReturn, nil
}
87 changes: 3 additions & 84 deletions pkg/helmvm/helmvm_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,6 @@ package helmvm

import (
"context"
"fmt"
"math"
"strconv"

"github.com/pkg/errors"
"github.com/replicatedhq/kots/pkg/helmvm/types"
"github.com/replicatedhq/kots/pkg/k8sutil"
Expand Down Expand Up @@ -34,58 +30,13 @@ func GetNodes(ctx context.Context, client kubernetes.Interface) (*types.HelmVMNo

toReturn := types.HelmVMNodes{}

nodePods, err := podsPerNode(ctx, client)
if err != nil {
return nil, errors.Wrap(err, "pods per node")
}

for _, node := range nodes.Items {
cpuCapacity := types.CapacityAvailable{}
memoryCapacity := types.CapacityAvailable{}
podCapacity := types.CapacityAvailable{}

memoryCapacity.Capacity = float64(node.Status.Capacity.Memory().Value()) / math.Pow(2, 30) // capacity in GB

cpuCapacity.Capacity, err = strconv.ParseFloat(node.Status.Capacity.Cpu().String(), 64)
if err != nil {
return nil, errors.Wrapf(err, "parse CPU capacity %q for node %s", node.Status.Capacity.Cpu().String(), node.Name)
}

podCapacity.Capacity = float64(node.Status.Capacity.Pods().Value())

nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{})
nodeMet, err := nodeMetrics(ctx, client, metricsClient, node)
if err != nil {
return nil, errors.Wrap(err, "list pod metrics")
}

if nodeMetrics.Usage.Memory() != nil {
memoryCapacity.Available = memoryCapacity.Capacity - float64(nodeMetrics.Usage.Memory().Value())/math.Pow(2, 30)
return nil, errors.Wrap(err, "node metrics")
}

if nodeMetrics.Usage.Cpu() != nil {
cpuCapacity.Available = cpuCapacity.Capacity - nodeMetrics.Usage.Cpu().AsApproximateFloat64()
}

podCapacity.Available = podCapacity.Capacity - float64(nodePods[node.Name])

nodeLabelArray := []string{}
for k, v := range node.Labels {
nodeLabelArray = append(nodeLabelArray, fmt.Sprintf("%s:%s", k, v))
}

toReturn.Nodes = append(toReturn.Nodes, types.Node{
Name: node.Name,
IsConnected: isConnected(node),
IsReady: isReady(node),
IsPrimaryNode: isPrimary(node),
CanDelete: node.Spec.Unschedulable && !isConnected(node),
KubeletVersion: node.Status.NodeInfo.KubeletVersion,
CPU: cpuCapacity,
Memory: memoryCapacity,
Pods: podCapacity,
Labels: nodeLabelArray,
Conditions: findNodeConditions(node.Status.Conditions),
})
toReturn.Nodes = append(toReturn.Nodes, *nodeMet)
}

isHelmVM, err := IsHelmVM(client)
Expand Down Expand Up @@ -122,38 +73,6 @@ func findNodeConditions(conditions []corev1.NodeCondition) types.NodeConditions
return discoveredConditions
}

// podsPerNode returns a map of node names to the number of pods, across all namespaces
func podsPerNode(ctx context.Context, client kubernetes.Interface) (map[string]int, error) {
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "list namespaces")
}

toReturn := map[string]int{}

for _, ns := range namespaces.Items {
nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "list pods in namespace %s", ns.Name)
}

for _, pod := range nsPods.Items {
pod := pod
if pod.Spec.NodeName == "" {
continue
}

if _, ok := toReturn[pod.Spec.NodeName]; !ok {
toReturn[pod.Spec.NodeName] = 0
}

toReturn[pod.Spec.NodeName]++
}
}

return toReturn, nil
}

func isConnected(node corev1.Node) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == "node.kubernetes.io/unreachable" {
Expand Down

0 comments on commit 819a9eb

Please sign in to comment.