Skip to content

Commit

Permalink
Embedded cluster working group (#4076)
Browse files Browse the repository at this point in the history
* implement 'IsHelmVM' function based on presence of configmap

if embedded-cluster-config exists in kube-system it is helmvm

add node metrics

add node pod capacity and list of pods to node metrics

implement per-node metrics endpoint with podlist

* generate a node join token (#4072)

* generate a node join token

* two mutexes, and do not restart successful pods

* generate the full node join command

* all controllers are also workers

* allow arbitrary node roles

* role is controller not controller+worker

* change node join command API to accept a list of roles

* Add view node page, add new add node modal (#4065)

* wip: refactor helm cluster page, add view node page

* add new add node modal

* protect routes

* move parenthesis

* use test data for now

* add material react table

* start connecting additional api calls, add test pod data

* add material react table to display pods

* revert change

* uncomment real queries

* fix useparams import

* fix params/routing, update api route

* fix loading/refetch state

* update generate add node request

* add error handling, add mui react table to cluster manage page

* move ts-ignore line

* remove delete functionality for now

* update tanstack query imports

* shorter embedded-cluster join commands (#4075)

* shorter commands wip

* actually return the token

* actually return token

* return entire commands

* handle error

* fix lolgic

* imports

* update percentages, add pods, fix link, show expiry (#4077)

* fix routing, add missing slash (#4079)

* remove test data, uncomment route protection, fix redirect after license upload (#4081)

* 24, not 60-something, character join tokens (#4080)

* node usage metrics not being collected is not a fatal error (#4082)

* include pod usage metrics (#4083)

add kube-proxy/os/kernel to node metrics

return 'used' not 'available'

* remove pause and delete columns, update styles (#4085)

* remove pause and delete columns until the api code is ready, add loading state, update styles

* update variable names, fix redirect to cluster manage

* include 'sudo ./' at the beginning of the node join command (#4088)

* format pod CPU and memory usage before returning (#4086)

* fixes: clipboard, redirect, columns styling/formatting (#4090)

* make clipboard work with http, fix redirect to cluster manage page

* right align columns, remove placeholders, add namespace column

* determine node roles based on their labels (#4089)

* fix vet (#4084)

* fix vet

* fix tests

* complete mock handler

* more test

* mockgen

* cleanup the k0s join token creation pod after completion (#4091)

* fix redirect, add column types (#4092)

* make labels optional (#4094)

* check for labels

* make labels optional type

* remove the --force flag (#4097)

* chore: change embedded cluster config map namespace (#4100)

* implement 'IsHelmVM' function based on presence of configmap

if embedded-cluster-config exists in kube-system it is helmvm

* change namespace of embedded cluster config

---------

Co-authored-by: Andrew Lavery <[email protected]>

* update redirect to cluster manage page, comment test data (#4096)

* improve logic around initial cluster flow, comment test data

* fix types, redirect on unknown or pending config status if helmvm

* node role labels (#4093)

* node role labels

* handle having no labels on the first node

* f

* include a prefix on the label

* = not :

* fix config redirect, remove unnecessary code (#4104)

* fix config redirect, remove unnecessary code

* linting

* quite a bit of renaming (#4106)

* renaming things to 'embedded-cluster'

* rename frontend

* import ordering

* undo goland's wonderful formatting changes

* function naming

* undo whitespace change

* Apply suggestions from code review

Co-authored-by: Salah Al Saleh <[email protected]>

* address review comments

* return to excluding helmvm from some tests

* another set of mistaken find-replace

---------

Co-authored-by: Star Richardson <[email protected]>
Co-authored-by: Diamon Wiggins <[email protected]>
Co-authored-by: Salah Al Saleh <[email protected]>
  • Loading branch information
4 people authored Oct 24, 2023
1 parent 9c9bc7e commit 1c968ee
Show file tree
Hide file tree
Showing 50 changed files with 2,474 additions and 1,407 deletions.
2 changes: 1 addition & 1 deletion .github/actions/cmx-versions/dist/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7710,4 +7710,4 @@ getClusterVersions();

module.exports = __webpack_exports__;
/******/ })()
;
;
21 changes: 21 additions & 0 deletions migrations/tables/embeded_cluster_tokens.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
apiVersion: schemas.schemahero.io/v1alpha4
kind: Table
metadata:
name: embedded-cluster-tokens
spec:
name: embedded_cluster_tokens
requires: []
schema:
rqlite:
strict: true
primaryKey:
- token
columns:
- name: token
type: text
constraints:
notNull: true
- name: roles
type: text
constraints:
notNull: true
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package helmvm
package embeddedcluster

import (
"context"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package helmvm
package embeddedcluster

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion pkg/helmvm/exec.go → pkg/embeddedcluster/exec.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package helmvm
package embeddedcluster

import (
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
Expand Down
169 changes: 169 additions & 0 deletions pkg/embeddedcluster/helmvm_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package embeddedcluster

import (
"context"
"fmt"
"math"
"strconv"
"strings"

"github.com/replicatedhq/kots/pkg/embeddedcluster/types"
"github.com/replicatedhq/kots/pkg/k8sutil"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
)

// GetNode will get a node with stats and podlist
func GetNode(ctx context.Context, client kubernetes.Interface, nodeName string) (*types.Node, error) {
node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("get node %s: %w", nodeName, err)
}

clientConfig, err := k8sutil.GetClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get cluster config: %w", err)
}

metricsClient, err := metricsv.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create metrics client: %w", err)
}

return nodeMetrics(ctx, client, metricsClient, *node)
}

func podsOnNode(ctx context.Context, client kubernetes.Interface, nodeName string) ([]corev1.Pod, error) {
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("list namespaces: %w", err)
}

toReturn := []corev1.Pod{}

for _, ns := range namespaces.Items {
nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)})
if err != nil {
return nil, fmt.Errorf("list pods on %s in namespace %s: %w", nodeName, ns.Name, err)
}

toReturn = append(toReturn, nsPods.Items...)
}
return toReturn, nil
}

// nodeMetrics takes a corev1.Node and gets metrics + status for that node
func nodeMetrics(ctx context.Context, client kubernetes.Interface, metricsClient *metricsv.Clientset, node corev1.Node) (*types.Node, error) {
nodePods, err := podsOnNode(ctx, client, node.Name)
if err != nil {
return nil, fmt.Errorf("pods per node: %w", err)
}

cpuCapacity := types.CapacityUsed{}
memoryCapacity := types.CapacityUsed{}
podCapacity := types.CapacityUsed{}

memoryCapacity.Capacity = float64(node.Status.Capacity.Memory().Value()) / math.Pow(2, 30) // capacity in GB

cpuCapacity.Capacity, err = strconv.ParseFloat(node.Status.Capacity.Cpu().String(), 64)
if err != nil {
return nil, fmt.Errorf("parse CPU capacity %q for node %s: %w", node.Status.Capacity.Cpu().String(), node.Name, err)
}

podCapacity.Capacity = float64(node.Status.Capacity.Pods().Value())

nodeUsageMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{})
if err == nil {
if nodeUsageMetrics.Usage.Memory() != nil {
memoryCapacity.Used = float64(nodeUsageMetrics.Usage.Memory().Value()) / math.Pow(2, 30)
}

if nodeUsageMetrics.Usage.Cpu() != nil {
cpuCapacity.Used = nodeUsageMetrics.Usage.Cpu().AsApproximateFloat64()
}
} else {
// if we can't get metrics, we'll do nothing for now
// in the future we may decide to retry or log a warning
}

podCapacity.Used = float64(len(nodePods))

podInfo := []types.PodInfo{}

for _, pod := range nodePods {
newInfo := types.PodInfo{
Name: pod.Name,
Namespace: pod.Namespace,
Status: string(pod.Status.Phase),
}

podMetrics, err := metricsClient.MetricsV1beta1().PodMetricses(pod.Namespace).Get(ctx, pod.Name, metav1.GetOptions{})
if err == nil {
podTotalMemory := 0.0
podTotalCPU := 0.0
for _, container := range podMetrics.Containers {
if container.Usage.Memory() != nil {
podTotalMemory += float64(container.Usage.Memory().Value()) / math.Pow(2, 20)
}
if container.Usage.Cpu() != nil {
podTotalCPU += container.Usage.Cpu().AsApproximateFloat64() * 1000
}
}
newInfo.Memory = fmt.Sprintf("%.1f MB", podTotalMemory)
newInfo.CPU = fmt.Sprintf("%.1f m", podTotalCPU)
}

podInfo = append(podInfo, newInfo)
}

return &types.Node{
Name: node.Name,
IsConnected: isConnected(node),
IsReady: isReady(node),
IsPrimaryNode: isPrimary(node),
CanDelete: node.Spec.Unschedulable && !isConnected(node),
KubeletVersion: node.Status.NodeInfo.KubeletVersion,
KubeProxyVersion: node.Status.NodeInfo.KubeProxyVersion,
OperatingSystem: node.Status.NodeInfo.OperatingSystem,
KernelVersion: node.Status.NodeInfo.KernelVersion,
CPU: cpuCapacity,
Memory: memoryCapacity,
Pods: podCapacity,
Labels: nodeRolesFromLabels(node.Labels),
Conditions: findNodeConditions(node.Status.Conditions),
PodList: podInfo,
}, nil
}

// nodeRolesFromLabels parses a map of k8s node labels, and returns the roles of the node
func nodeRolesFromLabels(labels map[string]string) []string {
toReturn := []string{}

numRolesStr, ok := labels[types.EMBEDDED_CLUSTER_ROLE_LABEL]
if !ok {
// the first node will not initially have a role label, but is a 'controller'
if val, ok := labels["node-role.kubernetes.io/control-plane"]; ok && val == "true" {
return []string{"controller"}
}
return nil
}
numRoles, err := strconv.Atoi(strings.TrimPrefix(numRolesStr, "total-"))
if err != nil {
fmt.Printf("failed to parse role label %q: %s", numRolesStr, err.Error())

return nil
}

for i := 0; i < numRoles; i++ {
roleLabel, ok := labels[fmt.Sprintf("%s-%d", types.EMBEDDED_CLUSTER_ROLE_LABEL, i)]
if !ok {
fmt.Printf("failed to find role label %d", i)
continue
}
toReturn = append(toReturn, roleLabel)
}

return toReturn
}
108 changes: 108 additions & 0 deletions pkg/embeddedcluster/helmvm_nodes.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package embeddedcluster

import (
"context"

"github.com/pkg/errors"
"github.com/replicatedhq/kots/pkg/embeddedcluster/types"
"github.com/replicatedhq/kots/pkg/k8sutil"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
)

// GetNodes will get a list of nodes with stats
func GetNodes(ctx context.Context, client kubernetes.Interface) (*types.EmbeddedClusterNodes, error) {
nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "list nodes")
}

clientConfig, err := k8sutil.GetClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "failed to get cluster config")
}

metricsClient, err := metricsv.NewForConfig(clientConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create metrics client")
}

toReturn := types.EmbeddedClusterNodes{}

for _, node := range nodes.Items {
nodeMet, err := nodeMetrics(ctx, client, metricsClient, node)
if err != nil {
return nil, errors.Wrap(err, "node metrics")
}

toReturn.Nodes = append(toReturn.Nodes, *nodeMet)
}

isEmbeddedCluster, err := IsEmbeddedCluster(client)
if err != nil {
return nil, errors.Wrap(err, "is embeddedcluster")
}
toReturn.IsEmbeddedClusterEnabled = isEmbeddedCluster

isHA, err := IsHA(client)
if err != nil {
return nil, errors.Wrap(err, "is ha")
}
toReturn.HA = isHA

return &toReturn, nil
}

func findNodeConditions(conditions []corev1.NodeCondition) types.NodeConditions {
discoveredConditions := types.NodeConditions{}
for _, condition := range conditions {
if condition.Type == "MemoryPressure" {
discoveredConditions.MemoryPressure = condition.Status == corev1.ConditionTrue
}
if condition.Type == "DiskPressure" {
discoveredConditions.DiskPressure = condition.Status == corev1.ConditionTrue
}
if condition.Type == "PIDPressure" {
discoveredConditions.PidPressure = condition.Status == corev1.ConditionTrue
}
if condition.Type == "Ready" {
discoveredConditions.Ready = condition.Status == corev1.ConditionTrue
}
}
return discoveredConditions
}

func isConnected(node corev1.Node) bool {
for _, taint := range node.Spec.Taints {
if taint.Key == "node.kubernetes.io/unreachable" {
return false
}
}

return true
}

func isReady(node corev1.Node) bool {
for _, condition := range node.Status.Conditions {
if condition.Type == "Ready" {
return condition.Status == corev1.ConditionTrue
}
}

return false
}

func isPrimary(node corev1.Node) bool {
for label := range node.ObjectMeta.Labels {
if label == "node-role.kubernetes.io/master" {
return true
}
if label == "node-role.kubernetes.io/control-plane" {
return true
}
}

return false
}
Loading

0 comments on commit 1c968ee

Please sign in to comment.