Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Laverya/embedded cluster apis #4069

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/handlers/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,8 @@ func RegisterSessionAuthRoutes(r *mux.Router, kotsStore store.Store, handler KOT
HandlerFunc(middleware.EnforceAccess(policy.ClusterWrite, handler.DeleteHelmVMNode))
r.Name("GetHelmVMNodes").Path("/api/v1/helmvm/nodes").Methods("GET").
HandlerFunc(middleware.EnforceAccess(policy.ClusterRead, handler.GetHelmVMNodes))
r.Name("GetHelmVMNode").Path("/api/v1/helmvm/node/{nodeName}").Methods("GET").
HandlerFunc(middleware.EnforceAccess(policy.ClusterRead, handler.GetHelmVMNode))

// Prometheus
r.Name("SetPrometheusAddress").Path("/api/v1/prometheus").Methods("POST").
Expand Down
21 changes: 20 additions & 1 deletion pkg/handlers/helmvm_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package handlers
import (
"net/http"

"github.com/gorilla/mux"
"github.com/replicatedhq/kots/pkg/helmvm"
"github.com/replicatedhq/kots/pkg/k8sutil"
"github.com/replicatedhq/kots/pkg/logger"
Expand All @@ -16,11 +17,29 @@ func (h *Handler) GetHelmVMNodes(w http.ResponseWriter, r *http.Request) {
return
}

nodes, err := helmvm.GetNodes(client)
nodes, err := helmvm.GetNodes(r.Context(), client)
if err != nil {
logger.Error(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
JSON(w, http.StatusOK, nodes)
}

func (h *Handler) GetHelmVMNode(w http.ResponseWriter, r *http.Request) {
client, err := k8sutil.GetClientset()
if err != nil {
logger.Error(err)
w.WriteHeader(http.StatusInternalServerError)
return
}

nodeName := mux.Vars(r)["nodeName"]
node, err := helmvm.GetNode(r.Context(), client, nodeName)
if err != nil {
logger.Error(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
JSON(w, http.StatusOK, node)
}
1 change: 1 addition & 0 deletions pkg/handlers/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ type KOTSHandler interface {
DrainHelmVMNode(w http.ResponseWriter, r *http.Request)
DeleteHelmVMNode(w http.ResponseWriter, r *http.Request)
GetHelmVMNodes(w http.ResponseWriter, r *http.Request)
GetHelmVMNode(w http.ResponseWriter, r *http.Request)

// Prometheus
SetPrometheusAddress(w http.ResponseWriter, r *http.Request)
Expand Down
12 changes: 12 additions & 0 deletions pkg/handlers/mock/mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

105 changes: 105 additions & 0 deletions pkg/helmvm/helmvm_node.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package helmvm

import (
"context"
"fmt"
"math"
"strconv"

"github.com/replicatedhq/kots/pkg/helmvm/types"
"github.com/replicatedhq/kots/pkg/k8sutil"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
)

// GetNode will get a node with stats and podlist
func GetNode(ctx context.Context, client kubernetes.Interface, nodeName string) (*types.Node, error) {
node, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("get node %s: %w", nodeName, err)
}

clientConfig, err := k8sutil.GetClusterConfig()
if err != nil {
return nil, fmt.Errorf("failed to get cluster config: %w", err)
}

metricsClient, err := metricsv.NewForConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("failed to create metrics client: %w", err)
}

nodePods, err := podsOnNode(ctx, client, nodeName)
if err != nil {
return nil, fmt.Errorf("pods per node: %w", err)
}

cpuCapacity := types.CapacityAvailable{}
memoryCapacity := types.CapacityAvailable{}
podCapacity := types.CapacityAvailable{}

memoryCapacity.Capacity = float64(node.Status.Capacity.Memory().Value()) / math.Pow(2, 30) // capacity in GB

cpuCapacity.Capacity, err = strconv.ParseFloat(node.Status.Capacity.Cpu().String(), 64)
if err != nil {
return nil, fmt.Errorf("parse CPU capacity %q for node %s: %w", node.Status.Capacity.Cpu().String(), node.Name, err)
}

podCapacity.Capacity = float64(node.Status.Capacity.Pods().Value())

nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("list pod metrics: %w", err)
}

if nodeMetrics.Usage.Memory() != nil {
memoryCapacity.Available = memoryCapacity.Capacity - float64(nodeMetrics.Usage.Memory().Value())/math.Pow(2, 30)
}

if nodeMetrics.Usage.Cpu() != nil {
cpuCapacity.Available = cpuCapacity.Capacity - nodeMetrics.Usage.Cpu().AsApproximateFloat64()
}

podCapacity.Available = podCapacity.Capacity - float64(len(nodePods))

nodeLabelArray := []string{}
for k, v := range node.Labels {
nodeLabelArray = append(nodeLabelArray, fmt.Sprintf("%s:%s", k, v))
}

return &types.Node{
Name: node.Name,
IsConnected: isConnected(*node),
IsReady: isReady(*node),
IsPrimaryNode: isPrimary(*node),
CanDelete: node.Spec.Unschedulable && !isConnected(*node),
KubeletVersion: node.Status.NodeInfo.KubeletVersion,
CPU: cpuCapacity,
Memory: memoryCapacity,
Pods: podCapacity,
Labels: nodeLabelArray,
Conditions: findNodeConditions(node.Status.Conditions),
PodList: nodePods,
}, nil
}

func podsOnNode(ctx context.Context, client kubernetes.Interface, nodeName string) ([]corev1.Pod, error) {
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("list namespaces: %w", err)
}

toReturn := []corev1.Pod{}

for _, ns := range namespaces.Items {
nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName)})
if err != nil {
return nil, fmt.Errorf("list pods on %s in namespace %s: %w", nodeName, ns.Name, err)
}

toReturn = append(toReturn, nsPods.Items...)
}
return toReturn, nil
}
126 changes: 51 additions & 75 deletions pkg/helmvm/helmvm_nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,43 @@ package helmvm

import (
"context"
"crypto/tls"
"encoding/json"
"fmt"
"io"
"math"
"net/http"
"os"
"strconv"
"time"

"github.com/pkg/errors"
"github.com/replicatedhq/kots/pkg/helmvm/types"
"github.com/replicatedhq/kots/pkg/logger"
"github.com/replicatedhq/kots/pkg/k8sutil"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
statsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
metricsv "k8s.io/metrics/pkg/client/clientset/versioned"
)

// GetNodes will get a list of nodes with stats
func GetNodes(client kubernetes.Interface) (*types.HelmVMNodes, error) {
nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
func GetNodes(ctx context.Context, client kubernetes.Interface) (*types.HelmVMNodes, error) {
nodes, err := client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "list nodes")
}

clientConfig, err := k8sutil.GetClusterConfig()
if err != nil {
return nil, errors.Wrap(err, "failed to get cluster config")
}

metricsClient, err := metricsv.NewForConfig(clientConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to create metrics client")
}

toReturn := types.HelmVMNodes{}

nodePods, err := podsPerNode(ctx, client)
if err != nil {
return nil, errors.Wrap(err, "pods per node")
}

for _, node := range nodes.Items {
cpuCapacity := types.CapacityAvailable{}
memoryCapacity := types.CapacityAvailable{}
Expand All @@ -44,32 +53,21 @@ func GetNodes(client kubernetes.Interface) (*types.HelmVMNodes, error) {

podCapacity.Capacity = float64(node.Status.Capacity.Pods().Value())

nodeIP := ""
for _, address := range node.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
nodeIP = address.Address
}
nodeMetrics, err := metricsClient.MetricsV1beta1().NodeMetricses().Get(ctx, node.Name, metav1.GetOptions{})
if err != nil {
return nil, errors.Wrap(err, "list pod metrics")
}

if nodeIP == "" {
logger.Infof("Did not find address for node %s, %+v", node.Name, node.Status.Addresses)
} else {
nodeMetrics, err := getNodeMetrics(nodeIP)
if err != nil {
logger.Infof("Got error retrieving stats for node %q: %v", node.Name, err)
} else {
if nodeMetrics.Node.Memory != nil && nodeMetrics.Node.Memory.AvailableBytes != nil {
memoryCapacity.Available = float64(*nodeMetrics.Node.Memory.AvailableBytes) / math.Pow(2, 30)
}

if nodeMetrics.Node.CPU != nil && nodeMetrics.Node.CPU.UsageNanoCores != nil {
cpuCapacity.Available = cpuCapacity.Capacity - (float64(*nodeMetrics.Node.CPU.UsageNanoCores) / math.Pow(10, 9))
}

podCapacity.Available = podCapacity.Capacity - float64(len(nodeMetrics.Pods))
}
if nodeMetrics.Usage.Memory() != nil {
memoryCapacity.Available = memoryCapacity.Capacity - float64(nodeMetrics.Usage.Memory().Value())/math.Pow(2, 30)
}

if nodeMetrics.Usage.Cpu() != nil {
cpuCapacity.Available = cpuCapacity.Capacity - nodeMetrics.Usage.Cpu().AsApproximateFloat64()
}

podCapacity.Available = podCapacity.Capacity - float64(nodePods[node.Name])

nodeLabelArray := []string{}
for k, v := range node.Labels {
nodeLabelArray = append(nodeLabelArray, fmt.Sprintf("%s:%s", k, v))
Expand Down Expand Up @@ -124,49 +122,36 @@ func findNodeConditions(conditions []corev1.NodeCondition) types.NodeConditions
return discoveredConditions
}

// get kubelet PKI info from /etc/kubernetes/pki/kubelet, use it to hit metrics server at `http://${nodeIP}:10255/stats/summary`
func getNodeMetrics(nodeIP string) (*statsv1alpha1.Summary, error) {
client := http.Client{
Timeout: time.Second,
// podsPerNode returns a map of node names to the number of pods, across all namespaces
func podsPerNode(ctx context.Context, client kubernetes.Interface) (map[string]int, error) {
namespaces, err := client.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.Wrap(err, "list namespaces")
}
port := 10255

// only use mutual TLS if client cert exists
_, err := os.ReadFile("/etc/kubernetes/pki/kubelet/client.crt")
if err == nil {
cert, err := tls.LoadX509KeyPair("/etc/kubernetes/pki/kubelet/client.crt", "/etc/kubernetes/pki/kubelet/client.key")
if err != nil {
return nil, errors.Wrap(err, "get client keypair")
}
toReturn := map[string]int{}

// this will leak memory
client.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
InsecureSkipVerify: true,
},
for _, ns := range namespaces.Items {
nsPods, err := client.CoreV1().Pods(ns.Name).List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.Wrapf(err, "list pods in namespace %s", ns.Name)
}
port = 10250
}

r, err := client.Get(fmt.Sprintf("https://%s:%d/stats/summary", nodeIP, port))
if err != nil {
return nil, errors.Wrapf(err, "get node %s stats", nodeIP)
}
defer r.Body.Close()
for _, pod := range nsPods.Items {
pod := pod
if pod.Spec.NodeName == "" {
continue
}

body, err := io.ReadAll(r.Body)
if err != nil {
return nil, errors.Wrapf(err, "read node %s stats response", nodeIP)
}
if _, ok := toReturn[pod.Spec.NodeName]; !ok {
toReturn[pod.Spec.NodeName] = 0
}

summary := statsv1alpha1.Summary{}
err = json.Unmarshal(body, &summary)
if err != nil {
return nil, errors.Wrapf(err, "parse node %s stats response", nodeIP)
toReturn[pod.Spec.NodeName]++
}
}

return &summary, nil
return toReturn, nil
}

func isConnected(node corev1.Node) bool {
Expand Down Expand Up @@ -201,12 +186,3 @@ func isPrimary(node corev1.Node) bool {

return false
}

func internalIP(node corev1.Node) string {
for _, address := range node.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
return address.Address
}
}
return ""
}
7 changes: 6 additions & 1 deletion pkg/helmvm/node_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,10 @@ import (

// GenerateAddNodeCommand will generate the HelmVM node add command for a primary or secondary node
func GenerateAddNodeCommand(client kubernetes.Interface, primary bool) ([]string, *time.Time, error) {
return nil, nil, nil
tomorrow := time.Now().Add(time.Hour * 24)
if primary {
return []string{"this is a primary join command string", "that can be multiple strings"}, &tomorrow, nil
} else {
return []string{"this is a secondary join command string", "that can be multiple strings"}, &tomorrow, nil
}
}
3 changes: 3 additions & 0 deletions pkg/helmvm/types/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package types

import corev1 "k8s.io/api/core/v1"

type HelmVMNodes struct {
Nodes []Node `json:"nodes"`
HA bool `json:"ha"`
Expand All @@ -18,6 +20,7 @@ type Node struct {
Pods CapacityAvailable `json:"pods"`
Labels []string `json:"labels"`
Conditions NodeConditions `json:"conditions"`
PodList []corev1.Pod `json:"podList"`
}

type CapacityAvailable struct {
Expand Down
Loading
Loading