Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#777 from yangjunmyfm192085/metrics…
Browse files Browse the repository at this point in the history
…-server-test8

Implement using /metrics/resource Kubelet endpoint
  • Loading branch information
k8s-ci-robot authored Jun 28, 2021
2 parents 468478e + a2d732e commit b28fa66
Show file tree
Hide file tree
Showing 16 changed files with 434 additions and 1,579 deletions.
5 changes: 1 addition & 4 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,14 @@ verify-deps:
# Generated
# ---------

generated_files=pkg/scraper/client/summary/types_easyjson.go pkg/api/generated/openapi/zz_generated.openapi.go
generated_files=pkg/api/generated/openapi/zz_generated.openapi.go

.PHONY: verify-generated
verify-generated: update-generated
@git diff --exit-code -- $(generated_files)

.PHONY: update-generated
update-generated:
# pkg/scraper/client/summary/types_easyjson.go:
go install -mod=readonly github.com/mailru/easyjson/easyjson
$(GOPATH)/bin/easyjson -all pkg/scraper/client/summary/types.go
# pkg/api/generated/openapi/zz_generated.openapi.go
go install -mod=readonly k8s.io/kube-openapi/cmd/openapi-gen
$(GOPATH)/bin/openapi-gen --logtostderr -i k8s.io/metrics/pkg/apis/metrics/v1beta1,k8s.io/apimachinery/pkg/apis/meta/v1,k8s.io/apimachinery/pkg/api/resource,k8s.io/apimachinery/pkg/version -p pkg/api/generated/openapi/ -O zz_generated.openapi -o $(REPO_DIR) -h $(REPO_DIR)/scripts/boilerplate.go.txt -r /dev/null
Expand Down
2 changes: 0 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/go-openapi/spec v0.20.3
github.com/google/addlicense v0.0.0-20210428195630-6d92264d7170
github.com/google/go-cmp v0.5.5
github.com/mailru/easyjson v0.7.7
github.com/onsi/ginkgo v1.13.0
github.com/onsi/gomega v1.11.0
github.com/prometheus/common v0.25.0
Expand All @@ -21,7 +20,6 @@ require (
k8s.io/klog/hack/tools v0.0.0-20210512110738-02ca14bed863
k8s.io/klog/v2 v2.8.0
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7
k8s.io/kubelet v0.21.1
k8s.io/metrics v0.21.1
sigs.k8s.io/mdtoc v1.0.1
)
5 changes: 1 addition & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -329,9 +329,8 @@ github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czP
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.0/go.mod h1:KAzv3t3aY1NaHWoQz1+4F1ccyAH66Jk7yos7ldAVICs=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
Expand Down Expand Up @@ -878,8 +877,6 @@ k8s.io/klog/v2 v2.8.0 h1:Q3gmuM9hKEjefWFFYF0Mat+YyFJvsUyYuwyNNJ5C9Ts=
k8s.io/klog/v2 v2.8.0/go.mod h1:hy9LJ/NvuK+iVyP4Ehqva4HxZG/oXyIS3n3Jmire4Ec=
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7 h1:vEx13qjvaZ4yfObSSXW7BrMc/KQBBT/Jyee8XtLf4x0=
k8s.io/kube-openapi v0.0.0-20210305001622-591a79e4bda7/go.mod h1:wXW5VT87nVfh/iLV8FpR2uDvrFyomxbtb1KivDbvPTE=
k8s.io/kubelet v0.21.1 h1:JeZsCr3GN2Kjg3gn21jLU10RFu0APUK/vdpFWa8P8Nw=
k8s.io/kubelet v0.21.1/go.mod h1:poOR6Iaa5WqytFOp0egXFV8c2XTLFxaXTdj5njUlnVY=
k8s.io/metrics v0.21.1 h1:Xlfrjdda/WWHxG6/h6ACykxb1RByy5EIT862Vc81IYQ=
k8s.io/metrics v0.21.1/go.mod h1:pyDVLsLe++FIGDBFU80NcW4xMFsuiVTWL8Zfi7+PpNo=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw=
Expand Down
2 changes: 1 addition & 1 deletion manifests/base/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ rules:
resources:
- pods
- nodes
- nodes/stats
- nodes/metrics
- namespaces
- configmaps
verbs:
Expand Down
2 changes: 1 addition & 1 deletion pkg/scraper/client/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ import (

// KubeletMetricsInterface knows how to fetch metrics from the Kubelet
type KubeletMetricsInterface interface {
// GetSummary fetches summary metrics from the given Kubelet
// GetMetrics fetches Resource metrics from the given Kubelet
GetMetrics(ctx context.Context, node *v1.Node) (*storage.MetricsBatch, error)
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Kubernetes Authors.
// Copyright 2021 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -12,13 +12,17 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package summary
package resource

import (
"bytes"
"context"
"fmt"
"io"

"github.com/prometheus/common/expfmt"
"github.com/prometheus/common/model"

"net"
"net/http"
"net/url"
Expand All @@ -27,16 +31,21 @@ import (

"k8s.io/client-go/rest"
"sigs.k8s.io/metrics-server/pkg/scraper/client"

"sigs.k8s.io/metrics-server/pkg/storage"

"github.com/mailru/easyjson"
"sigs.k8s.io/metrics-server/pkg/utils"

corev1 "k8s.io/api/core/v1"

"sigs.k8s.io/metrics-server/pkg/utils"
)

type kubeletClient struct {
defaultPort int
useNodeStatusPort bool
client *http.Client
scheme string
addrResolver utils.NodeAddressResolver
buffers sync.Pool
}

func NewClient(config client.KubeletClientConfig) (*kubeletClient, error) {
transport, err := rest.TransportFor(&config.Client)
if err != nil {
Expand All @@ -60,42 +69,9 @@ func NewClient(config client.KubeletClientConfig) (*kubeletClient, error) {
}, nil
}

type kubeletClient struct {
defaultPort int
useNodeStatusPort bool
client *http.Client
scheme string
addrResolver utils.NodeAddressResolver
buffers sync.Pool
}

var _ client.KubeletMetricsInterface = (*kubeletClient)(nil)

func (kc *kubeletClient) makeRequestAndGetValue(client *http.Client, req *http.Request, value easyjson.Unmarshaler) error {
// TODO(directxman12): support validating certs by hostname
response, err := client.Do(req)
if err != nil {
return err
}
defer response.Body.Close()
b := kc.getBuffer()
defer kc.returnBuffer(b)
_, err = io.Copy(b, response.Body)
if err != nil {
return err
}
body := b.Bytes()
if response.StatusCode != http.StatusOK {
return fmt.Errorf("GET %q: bad status code %q", req.URL, response.Status)
}

err = easyjson.Unmarshal(body, value)
if err != nil {
return fmt.Errorf("GET %q: failed to parse output: %w", req.URL, err)
}
return nil
}

//GetMetrics get metrics from kubelet /metrics/resource endpoint
func (kc *kubeletClient) GetMetrics(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) {
port := kc.defaultPort
nodeStatusPort := int(node.Status.DaemonEndpoints.KubeletEndpoint.Port)
Expand All @@ -107,23 +83,56 @@ func (kc *kubeletClient) GetMetrics(ctx context.Context, node *corev1.Node) (*st
return nil, err
}
url := url.URL{
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(port)),
Path: "/stats/summary",
RawQuery: "only_cpu_and_memory=true",
Scheme: kc.scheme,
Host: net.JoinHostPort(addr, strconv.Itoa(port)),
Path: "/metrics/resource",
}

req, err := http.NewRequest("GET", url.String(), nil)
if err != nil {
return nil, err
}
summary := &Summary{}
client := kc.client
if client == nil {
client = http.DefaultClient
samples, err := kc.sendRequestDecode(kc.client, req.WithContext(ctx))
if err != nil {
return nil, err
}
return decodeBatch(samples, node.Name), err
}

func (kc *kubeletClient) sendRequestDecode(client *http.Client, req *http.Request) ([]*model.Sample, error) {
response, err := client.Do(req)
if err != nil {
return nil, err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return nil, fmt.Errorf("request failed - %q.", response.Status)
}
b := kc.getBuffer()
defer kc.returnBuffer(b)
_, err = io.Copy(b, response.Body)
if err != nil {
return nil, err
}
dec := expfmt.NewDecoder(b, expfmt.FmtText)
decoder := expfmt.SampleDecoder{
Dec: dec,
Opts: &expfmt.DecodeOptions{},
}

var samples []*model.Sample
for {
var v model.Vector
if err := decoder.Decode(&v); err != nil {
if err == io.EOF {
// Expected loop termination condition.
break
}
return nil, err
}
samples = append(samples, v...)
}
err = kc.makeRequestAndGetValue(client, req.WithContext(ctx), summary)
return decodeBatch(summary), err
return samples, nil
}

func (kc *kubeletClient) getBuffer() *bytes.Buffer {
Expand Down
158 changes: 158 additions & 0 deletions pkg/scraper/client/resource/decode.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2021 The Kubernetes Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package resource

import (
"time"

"github.com/prometheus/common/model"
apitypes "k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"

"sigs.k8s.io/metrics-server/pkg/storage"
)

const (
nameSpaceMetricName = "namespace"
podNameMetricName = "pod"
containerNameMetricName = "container"
nodeCpuUsageMetricName = model.LabelValue("node_cpu_usage_seconds_total")
nodeMemUsageMetricName = model.LabelValue("node_memory_working_set_bytes")
containerCpuUsageMetricName = model.LabelValue("container_cpu_usage_seconds_total")
containerMemUsageMetricName = model.LabelValue("container_memory_working_set_bytes")
)

func decodeBatch(samples []*model.Sample, nodeName string) *storage.MetricsBatch {
if len(samples) == 0 {
return nil
}
res := &storage.MetricsBatch{
Nodes: make(map[string]storage.MetricsPoint),
Pods: make(map[apitypes.NamespacedName]storage.PodMetricsPoint),
}
node := &storage.MetricsPoint{}
pods := make(map[apitypes.NamespacedName]storage.PodMetricsPoint)
for _, sample := range samples {
//parse metrics from sample
switch sample.Metric[model.MetricNameLabel] {
case nodeCpuUsageMetricName:
parseNodeCpuUsageMetrics(sample, node)
case nodeMemUsageMetricName:
parseNodeMemUsageMetrics(sample, node)
case containerCpuUsageMetricName:
parseContainerCpuMetrics(sample, pods)
case containerMemUsageMetricName:
parseContainerMemMetrics(sample, pods)
}
}

if node.Timestamp.IsZero() || node.CumulativeCpuUsed == 0 || node.MemoryUsage == 0 {
klog.V(1).InfoS("Failed getting complete node metric", "node", nodeName, "metric", node)
node = nil
} else {
res.Nodes[nodeName] = *node
}

for podRef, podMetric := range pods {
if len(podMetric.Containers) != 0 {
//drop container metrics when Timestamp is zero

pm := storage.PodMetricsPoint{
Containers: checkContainerMetrics(podMetric),
}
if pm.Containers == nil {
klog.V(1).InfoS("Failed getting complete Pod metric", "pod", klog.KRef(podRef.Namespace, podRef.Name))
} else {
res.Pods[podRef] = pm
}
}
}
return res
}

func getNamespaceName(sample *model.Sample) apitypes.NamespacedName {
return apitypes.NamespacedName{Namespace: string(sample.Metric[nameSpaceMetricName]), Name: string(sample.Metric[podNameMetricName])}
}

func parseNodeCpuUsageMetrics(sample *model.Sample, node *storage.MetricsPoint) {
//unit of node_cpu_usage_seconds_total is second, need to convert to nanosecond
node.CumulativeCpuUsed = uint64(sample.Value * 1e9)
if sample.Timestamp != 0 {
//unit of timestamp is millisecond, need to convert to nanosecond
node.Timestamp = time.Unix(0, int64(sample.Timestamp*1e6))
}
}

func parseNodeMemUsageMetrics(sample *model.Sample, node *storage.MetricsPoint) {
node.MemoryUsage = uint64(sample.Value)
if node.Timestamp.IsZero() && sample.Timestamp != 0 {
//unit of timestamp is millisecond, need to convert to nanosecond
node.Timestamp = time.Unix(0, int64(sample.Timestamp*1e6))
}
}

func parseContainerCpuMetrics(sample *model.Sample, pods map[apitypes.NamespacedName]storage.PodMetricsPoint) {
namespaceName := getNamespaceName(sample)
containerName := string(sample.Metric[containerNameMetricName])
if _, findPod := pods[namespaceName]; !findPod {
pods[namespaceName] = storage.PodMetricsPoint{Containers: make(map[string]storage.MetricsPoint)}
}
if _, findContainer := pods[namespaceName].Containers[containerName]; !findContainer {
pods[namespaceName].Containers[containerName] = storage.MetricsPoint{}
}
//unit of node_cpu_usage_seconds_total is second, need to convert to nanosecond
containerMetrics := pods[namespaceName].Containers[containerName]
containerMetrics.CumulativeCpuUsed = uint64(sample.Value * 1e9)
if sample.Timestamp != 0 {
//unit of timestamp is millisecond, need to convert to nanosecond
containerMetrics.Timestamp = time.Unix(0, int64(sample.Timestamp*1e6))
}
pods[namespaceName].Containers[containerName] = containerMetrics
}

func parseContainerMemMetrics(sample *model.Sample, pods map[apitypes.NamespacedName]storage.PodMetricsPoint) {
namespaceName := getNamespaceName(sample)
containerName := string(sample.Metric[containerNameMetricName])

if _, findPod := pods[namespaceName]; !findPod {
pods[namespaceName] = storage.PodMetricsPoint{Containers: make(map[string]storage.MetricsPoint)}
}
if _, findContainer := pods[namespaceName].Containers[containerName]; !findContainer {
pods[namespaceName].Containers[containerName] = storage.MetricsPoint{}
}
containerMetrics := pods[namespaceName].Containers[containerName]
containerMetrics.MemoryUsage = uint64(sample.Value)
if containerMetrics.Timestamp.IsZero() && sample.Timestamp != 0 {
//unit of timestamp is millisecond, need to convert to nanosecond
containerMetrics.Timestamp = time.Unix(0, int64(sample.Timestamp*1e6))
}
pods[namespaceName].Containers[containerName] = containerMetrics
}

func checkContainerMetrics(podMetric storage.PodMetricsPoint) map[string]storage.MetricsPoint {
podMetrics := make(map[string]storage.MetricsPoint)
for containerName, containerMetric := range podMetric.Containers {
if containerMetric != (storage.MetricsPoint{}) {
//drop metrics when CumulativeCpuUsed or MemoryUsage is zero
if containerMetric.CumulativeCpuUsed == 0 || containerMetric.MemoryUsage == 0 {
klog.V(1).InfoS("Failed getting complete container metric", "containerName", containerName, "containerMetric", containerMetric)
return nil
} else {
podMetrics[containerName] = containerMetric
}
}
}
return podMetrics
}
Loading

0 comments on commit b28fa66

Please sign in to comment.