diff --git a/cmd/k8s-netperf/k8s-netperf.go b/cmd/k8s-netperf/k8s-netperf.go index e093ed5c..44fcbebf 100644 --- a/cmd/k8s-netperf/k8s-netperf.go +++ b/cmd/k8s-netperf/k8s-netperf.go @@ -254,7 +254,7 @@ var rootCmd = &cobra.Command{ if pavail { for i, npr := range sr.Results { - if len(npr.ClientNodeInfo.Hostname) > 0 && len(npr.ServerNodeInfo.Hostname) > 0 { + if len(npr.ClientNodeInfo.NodeName) > 0 && len(npr.ServerNodeInfo.NodeName) > 0 { sr.Results[i].ClientMetrics, _ = metrics.QueryNodeCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime) sr.Results[i].ServerMetrics, _ = metrics.QueryNodeCPU(npr.ServerNodeInfo, pcon, npr.StartTime, npr.EndTime) sr.Results[i].ClientPodCPU, _ = metrics.TopPodCPU(npr.ClientNodeInfo, pcon, npr.StartTime, npr.EndTime) @@ -465,8 +465,6 @@ func executeWorkload(nc config.Config, npr.EndTime = time.Now() npr.ClientNodeInfo = s.ClientNodeInfo npr.ServerNodeInfo = s.ServerNodeInfo - npr.ServerNodeLabels, _ = k8s.GetNodeLabels(s.ClientSet, s.ServerNodeInfo.Hostname) - npr.ClientNodeLabels, _ = k8s.GetNodeLabels(s.ClientSet, s.ClientNodeInfo.Hostname) return npr } diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index c3719b29..a06b890e 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -19,36 +19,36 @@ const ltcyMetric = "usec" // Doc struct of the JSON document to be indexed type Doc struct { - UUID string `json:"uuid"` - Timestamp time.Time `json:"timestamp"` - HostNetwork bool `json:"hostNetwork"` - Driver string `json:"driver"` - Parallelism int `json:"parallelism"` - Profile string `json:"profile"` - Duration int `json:"duration"` - Service bool `json:"service"` - Local bool `json:"local"` - Virt bool `json:"virt"` - AcrossAZ bool `json:"acrossAZ"` - Samples int `json:"samples"` - Messagesize int `json:"messageSize"` - Burst int `json:"burst"` - Throughput float64 `json:"throughput"` - Latency float64 `json:"latency"` - TputMetric string `json:"tputMetric"` - LtcyMetric string `json:"ltcyMetric"` - TCPRetransmit float64 `json:"tcpRetransmits"` - UDPLossPercent float64 `json:"udpLossPercent"` - ToolVersion string `json:"toolVersion"` - ToolGitCommit string `json:"toolGitCommit"` - Metadata result.Metadata `json:"metadata"` - ServerNodeCPU metrics.NodeCPU `json:"serverCPU"` - ServerPodCPU []metrics.PodCPU `json:"serverPods"` - ClientNodeCPU metrics.NodeCPU `json:"clientCPU"` - ClientPodCPU []metrics.PodCPU `json:"clientPods"` - ClientNodeLabels map[string]string `json:"clientNodeLabels"` - ServerNodeLabels map[string]string `json:"serverNodeLabels"` - Confidence []float64 `json:"confidence"` + UUID string `json:"uuid"` + Timestamp time.Time `json:"timestamp"` + HostNetwork bool `json:"hostNetwork"` + Driver string `json:"driver"` + Parallelism int `json:"parallelism"` + Profile string `json:"profile"` + Duration int `json:"duration"` + Service bool `json:"service"` + Local bool `json:"local"` + Virt bool `json:"virt"` + AcrossAZ bool `json:"acrossAZ"` + Samples int `json:"samples"` + Messagesize int `json:"messageSize"` + Burst int `json:"burst"` + Throughput float64 `json:"throughput"` + Latency float64 `json:"latency"` + TputMetric string `json:"tputMetric"` + LtcyMetric string `json:"ltcyMetric"` + TCPRetransmit float64 `json:"tcpRetransmits"` + UDPLossPercent float64 `json:"udpLossPercent"` + ToolVersion string `json:"toolVersion"` + ToolGitCommit string `json:"toolGitCommit"` + Metadata result.Metadata `json:"metadata"` + ServerNodeCPU metrics.NodeCPU `json:"serverCPU"` + ServerPodCPU []metrics.PodCPU `json:"serverPods"` + ClientNodeCPU metrics.NodeCPU `json:"clientCPU"` + ClientPodCPU []metrics.PodCPU `json:"clientPods"` + Confidence []float64 `json:"confidence"` + ServerNodeInfo metrics.NodeInfo `json:"serverNodeInfo"` + ClientNodeInfo metrics.NodeInfo `json:"clientNodeInfo"` } // Connect returns a client connected to the desired cluster. @@ -89,29 +89,31 @@ func BuildDocs(sr result.ScenarioResults, uuid string) ([]interface{}, error) { } c := []float64{lo, hi} d := Doc{ - UUID: uuid, - Timestamp: time, - ToolVersion: sr.Version, - ToolGitCommit: sr.GitCommit, - Driver: r.Driver, - HostNetwork: r.HostNetwork, - Parallelism: r.Parallelism, - Profile: r.Profile, - Duration: r.Duration, - Virt: sr.Virt, - Samples: r.Samples, - Service: r.Service, - Messagesize: r.MessageSize, - Burst: r.Burst, - TputMetric: r.Metric, - LtcyMetric: ltcyMetric, - ServerNodeCPU: r.ServerMetrics, - ClientNodeCPU: r.ClientMetrics, - ServerPodCPU: r.ServerPodCPU.Results, - ClientPodCPU: r.ClientPodCPU.Results, - Metadata: sr.Metadata, - AcrossAZ: r.AcrossAZ, - Confidence: c, + UUID: uuid, + Timestamp: time, + ToolVersion: sr.Version, + ToolGitCommit: sr.GitCommit, + Driver: r.Driver, + HostNetwork: r.HostNetwork, + Parallelism: r.Parallelism, + Profile: r.Profile, + Duration: r.Duration, + Virt: sr.Virt, + Samples: r.Samples, + Service: r.Service, + Messagesize: r.MessageSize, + Burst: r.Burst, + TputMetric: r.Metric, + LtcyMetric: ltcyMetric, + ServerNodeCPU: r.ServerMetrics, + ClientNodeCPU: r.ClientMetrics, + ServerPodCPU: r.ServerPodCPU.Results, + ClientPodCPU: r.ClientPodCPU.Results, + Metadata: sr.Metadata, + AcrossAZ: r.AcrossAZ, + Confidence: c, + ClientNodeInfo: r.ClientNodeInfo, + ServerNodeInfo: r.ServerNodeInfo, } UDPLossPercent, e := result.Average(r.LossSummary) if e != nil { diff --git a/pkg/k8s/kubernetes.go b/pkg/k8s/kubernetes.go index 8a629613..8d681f8d 100644 --- a/pkg/k8s/kubernetes.go +++ b/pkg/k8s/kubernetes.go @@ -12,6 +12,7 @@ import ( corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/rbac/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" @@ -153,16 +154,11 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { } // Get node count - nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker="}) + nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{LabelSelector: "node-role.kubernetes.io/worker=,node-role.kubernetes.io/infra!="}) if err != nil { return err } - ncount := 0 - for _, node := range nodes.Items { - if _, ok := node.Labels["node-role.kubernetes.io/infra"]; !ok { - ncount++ - } - } + ncount := len(nodes.Items) log.Debugf("Number of nodes with role worker: %d", ncount) if (s.HostNetwork || !s.NodeLocal) && ncount < 2 { return fmt.Errorf(" not enough nodes with label worker= to execute test (current number of nodes: %d).", ncount) @@ -218,7 +214,10 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { return err } } - s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdp) + s.ClientNodeInfo, err = GetPodNodeInfo(client, labels.Set(cdp.Labels).String()) + if err != nil { + return err + } } // Create iperf service @@ -410,7 +409,6 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { } sdpHost.PodAntiAffinity = antiAffinity } - if ncount > 1 { if s.HostNetwork { if !s.VM { @@ -431,9 +429,12 @@ func BuildSUT(client *kubernetes.Clientset, s *config.PerfScenarios) error { if err != nil { return err } - s.ServerNodeInfo, _ = GetPodNodeInfo(client, sdp) + s.ServerNodeInfo, err = GetPodNodeInfo(client, labels.Set(sdp.Labels).String()) + if err != nil { + return err + } if !s.NodeLocal { - s.ClientNodeInfo, _ = GetPodNodeInfo(client, cdpAcross) + s.ClientNodeInfo, err = GetPodNodeInfo(client, labels.Set(cdpAcross.Labels).String()) } if err != nil { return err @@ -459,17 +460,17 @@ func launchServerVM(perf *config.PerfScenarios, name string, podAff *corev1.PodA return err } if strings.Contains(name, "host") { - perf.ServerHost, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole)) + perf.ServerHost, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole)) if err != nil { return err } } else { - perf.Server, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole)) + perf.Server, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", serverRole)) if err != nil { return err } } - perf.ServerNodeInfo, _ = GetNakedPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", serverRole)) + perf.ServerNodeInfo, _ = GetPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", serverRole)) return nil } @@ -485,17 +486,17 @@ func launchClientVM(perf *config.PerfScenarios, name string, podAff *corev1.PodA return err } if strings.Contains(name, "host") { - perf.ClientHost, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", name)) + perf.ClientHost, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", name)) if err != nil { return err } } else { - perf.ClientAcross, err = GetNakedPods(perf.ClientSet, fmt.Sprintf("app=%s", name)) + perf.ClientAcross, err = GetPods(perf.ClientSet, fmt.Sprintf("app=%s", name)) if err != nil { return err } } - perf.ClientNodeInfo, _ = GetNakedPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", name)) + perf.ClientNodeInfo, _ = GetPodNodeInfo(perf.ClientSet, fmt.Sprintf("app=%s", name)) return nil } @@ -525,7 +526,7 @@ func deployDeployment(client *kubernetes.Clientset, dp DeploymentParams) (corev1 return pods, err } // Retrieve pods which match the server/client role labels - pods, err = GetPods(client, dp) + pods, err = GetPods(client, labels.Set(dp.Labels).String()) if err != nil { return pods, err } @@ -544,7 +545,7 @@ func WaitForReady(c *kubernetes.Clientset, dp DeploymentParams) (bool, error) { for event := range dw.ResultChan() { d, ok := event.Object.(*appsv1.Deployment) if !ok { - fmt.Println("❌ Issue with the Deployment") + log.Error("❌ Issue with the Deployment") } if d.Name == dp.Name { if d.Status.ReadyReplicas == 1 { @@ -660,46 +661,8 @@ func CreateDeployment(dp DeploymentParams, client *kubernetes.Clientset) (*appsv return dc.Create(context.TODO(), deployment, metav1.CreateOptions{}) } -// GetNodeLabels Return Labels for a specific node -func GetNodeLabels(c *kubernetes.Clientset, node string) (map[string]string, error) { - log.Debugf("Looking for Node labels for node - %s", node) - nodeInfo, err := c.CoreV1().Nodes().Get(context.TODO(), node, metav1.GetOptions{}) - if err != nil { - return nil, err - } - return nodeInfo.GetLabels(), nil -} - -// GetPodNodeInfo collects the node information for a specific pod -func GetPodNodeInfo(c *kubernetes.Clientset, dp DeploymentParams) (metrics.NodeInfo, error) { - var info metrics.NodeInfo - d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{}) - if err != nil { - return info, fmt.Errorf("❌ Failure to capture deployment: %v", err) - } - selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) - if err != nil { - return info, fmt.Errorf("❌ Failure to capture deployment label: %v", err) - } - pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"}) - if err != nil { - return info, fmt.Errorf("❌ Failure to capture pods: %v", err) - } - for pod := range pods.Items { - p := pods.Items[pod] - if pods.Items[pod].DeletionTimestamp != nil { - continue - } else { - info.IP = p.Status.HostIP - info.Hostname = p.Spec.NodeName - } - } - log.Debugf("%s Running on %s with IP %s", d.Name, info.Hostname, info.IP) - return info, nil -} - -// GetNakedPodNodeInfo collects the node information for a specific pod -func GetNakedPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInfo, error) { +// GetPodNodeInfo collects the node information for a node running a pod with a specific label +func GetPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInfo, error) { var info metrics.NodeInfo listOpt := metav1.ListOptions{ LabelSelector: label, @@ -709,65 +672,29 @@ func GetNakedPodNodeInfo(c *kubernetes.Clientset, label string) (metrics.NodeInf if err != nil { return info, fmt.Errorf("❌ Failure to capture pods: %v", err) } - for pod := range pods.Items { - p := pods.Items[pod] - if pods.Items[pod].DeletionTimestamp != nil { - continue - } else { - info.IP = p.Status.HostIP - info.Hostname = p.Spec.NodeName - } - } - log.Debugf("Machine with lablel %s is Running on %s with IP %s", label, info.Hostname, info.IP) - return info, nil -} - -// GetPods searches for a specific set of pods from DeploymentParms -// It returns a PodList if the deployment is found. -// NOTE : Since we can update the replicas to be > 1, is why I return a PodList. -func GetPods(c *kubernetes.Clientset, dp DeploymentParams) (corev1.PodList, error) { - d, err := c.AppsV1().Deployments(dp.Namespace).Get(context.TODO(), dp.Name, metav1.GetOptions{}) - npl := corev1.PodList{} + info.NodeName = pods.Items[0].Spec.NodeName + info.IP = pods.Items[0].Status.HostIP + node, err := c.CoreV1().Nodes().Get(context.TODO(), info.NodeName, metav1.GetOptions{}) if err != nil { - return npl, fmt.Errorf("❌ Failure to capture deployment: %v", err) + return info, err } - selector, err := metav1.LabelSelectorAsSelector(d.Spec.Selector) - if err != nil { - return npl, fmt.Errorf("❌ Failure to capture deployment label: %v", err) - } - pods, err := c.CoreV1().Pods(dp.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: selector.String(), FieldSelector: "status.phase=Running"}) - if err != nil { - return npl, fmt.Errorf("❌ Failure to capture pods: %v", err) - } - for pod := range pods.Items { - if pods.Items[pod].DeletionTimestamp != nil { - continue - } else { - npl.Items = append(npl.Items, pods.Items[pod]) - } - } - return npl, nil + info.NodeSystemInfo = node.Status.NodeInfo + log.Debugf("Machine with label %s is Running on %s with IP %s", label, info.NodeName, info.IP) + return info, nil } -// GetNakedPods when we deploy pods without a higher-level controller like deployment -func GetNakedPods(c *kubernetes.Clientset, label string) (corev1.PodList, error) { - npl := corev1.PodList{} +// GetPods returns pods with a specific label +func GetPods(c *kubernetes.Clientset, label string) (corev1.PodList, error) { listOpt := metav1.ListOptions{ LabelSelector: label, + FieldSelector: "status.phase=Running", } log.Infof("Looking for pods with label %s", fmt.Sprint(label)) pods, err := c.CoreV1().Pods(namespace).List(context.TODO(), listOpt) if err != nil { - return npl, fmt.Errorf("❌ Failure to capture pods: %v", err) - } - for pod := range pods.Items { - if pods.Items[pod].DeletionTimestamp != nil { - continue - } else { - npl.Items = append(npl.Items, pods.Items[pod]) - } + return *pods, fmt.Errorf("❌ Failure to capture pods: %v", err) } - return npl, nil + return *pods, nil } diff --git a/pkg/metrics/system.go b/pkg/metrics/system.go index 9f542184..5c9cda61 100644 --- a/pkg/metrics/system.go +++ b/pkg/metrics/system.go @@ -11,14 +11,15 @@ import ( "github.com/cloud-bulldozer/go-commons/prometheus" "github.com/cloud-bulldozer/k8s-netperf/pkg/logging" "github.com/prometheus/common/model" + corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/clientcmd" ) // NodeInfo stores the node metadata like IP and Hostname type NodeInfo struct { - IP string - Hostname string - NodeName string + IP string `json:"ip"` + NodeName string `json:"nodeName"` + corev1.NodeSystemInfo } // NodeCPU stores CPU information for a specific Node @@ -127,7 +128,7 @@ func QueryNodeCPU(node NodeInfo, conn PromConnect, start time.Time, end time.Tim query := fmt.Sprintf("(avg by(mode) (rate(node_cpu_seconds_total{instance=~\"%s:.*\"}[2m])) * 100)", node.IP) if conn.OpenShift { // OpenShift changes the instance in its metrics. - query = fmt.Sprintf("(avg by(mode) (rate(node_cpu_seconds_total{instance=~\"%s\"}[2m])) * 100)", node.Hostname) + query = fmt.Sprintf("(avg by(mode) (rate(node_cpu_seconds_total{instance=~\"%s\"}[2m])) * 100)", node.NodeName) } logging.Debugf("Prom Query : %s", query) val, err := conn.Client.QueryRange(query, start, end, time.Minute) diff --git a/pkg/results/result.go b/pkg/results/result.go index a3d64766..6b701ef1 100644 --- a/pkg/results/result.go +++ b/pkg/results/result.go @@ -44,8 +44,6 @@ type Data struct { ServerMetrics metrics.NodeCPU ClientPodCPU metrics.PodValues ServerPodCPU metrics.PodValues - ClientNodeLabels map[string]string - ServerNodeLabels map[string]string } // ScenarioResults each scenario could have multiple results