Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]enable code and test for tkr 1.22 #7

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion clusterloader2/pkg/execservice/exec_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ const (
execDeploymentNamespace = "cluster-loader"
execDeploymentName = "exec-pod"
execDeploymentPath = "pkg/execservice/manifest/exec_deployment.yaml"
execPodReplicas = 3
execPodReplicas = 10
execPodSelector = "feature = exec"

execPodCheckInterval = 10 * time.Second
Expand Down
12 changes: 7 additions & 5 deletions clusterloader2/pkg/measurement/common/etcd_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,22 +193,24 @@ func (e *etcdMetricsMeasurement) getEtcdMetrics(host string, provider provider.P
// in order to bypass TLS credential requirement when checking etc /metrics and /health, you
// need to provide the insecure http port number to access etcd, http://localhost:2382 for
// example.
cmd := fmt.Sprintf("curl http://localhost:%d/metrics", port)
if samples, err := e.sshEtcdMetrics(cmd, host, provider); err == nil {
return samples, nil
}
// cmd := fmt.Sprintf("curl http://localhost:%d/metrics", port)
// if samples, err := e.sshEtcdMetrics(cmd, host, provider); err == nil {
// return samples, nil
// }

// Use old endpoint if new one fails, "2379" is hard-coded here as well, it is kept as is since
// we don't want to bloat the cluster config only for a fall-back attempt.
etcdCert, etcdKey, etcdHost := os.Getenv("ETCD_CERTIFICATE"), os.Getenv("ETCD_KEY"), os.Getenv("ETCD_HOST")
etcdHost = host
cmd := ""
if etcdHost == "" {
etcdHost = "localhost"
}
if etcdCert == "" || etcdKey == "" {
klog.Warning("empty etcd cert or key, using http")
cmd = fmt.Sprintf("curl http://%s:2379/metrics", etcdHost)
} else {
cmd = fmt.Sprintf("curl -k --cert %s --key %s https://%s:2379/metrics", etcdCert, etcdKey, etcdHost)
cmd = fmt.Sprintf("sudo curl -k --cert %s --key %s https://%s:2379/metrics", etcdCert, etcdKey, etcdHost)
}

return e.sshEtcdMetrics(cmd, host, provider)
Expand Down
2 changes: 2 additions & 0 deletions clusterloader2/pkg/measurement/common/scheduler_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,8 @@ func (s *schedulerLatencyMeasurement) sendRequestToScheduler(c clientset.Interfa
return "", fmt.Errorf("unknown REST request")
}

// luwang hack for cluster deployed by tanzu
masterRegistered = false
var responseText string
if masterRegistered {
ctx, cancel := context.WithTimeout(context.Background(), singleRestCallTimeout)
Expand Down
173 changes: 148 additions & 25 deletions clusterloader2/pkg/measurement/common/service_creation_latency.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"fmt"
"time"
"sync"
"regexp"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
Expand All @@ -31,6 +33,7 @@ import (
"k8s.io/klog"

"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/perf-tests/clusterloader2/pkg/errors"
"k8s.io/perf-tests/clusterloader2/pkg/execservice"
"k8s.io/perf-tests/clusterloader2/pkg/measurement"
measurementutil "k8s.io/perf-tests/clusterloader2/pkg/measurement/util"
Expand All @@ -48,10 +51,21 @@ const (
defaultCheckInterval = 10 * time.Second
pingBackoff = 1 * time.Second
pingChecks = 10
// backendThreshold used to check whether total backend number curled from the services is
// larger than/equal to this value.
// 0 means that the testing disabled this check.
// regexString is the regex expression string, which is used to filter the main caring info from the service curl output
// regexString define depends on the container image used by services' pod.
// backendThreshold need to be enabled with regexString, align with the container image.
// example is in testing/svc/
backendThreshold = 0
regexString = ""

creatingPhase = "creating"
ipAssigningPhase = "ipAssigning"
reachabilityPhase = "reachability"
startCheckingPhase = "startChecking"
consecutiveSuccCheckStartPhase = "consecutiveSuccCheckStart"
)

func init() {
Expand All @@ -63,21 +77,29 @@ func init() {
func createServiceCreationLatencyMeasurement() measurement.Measurement {
return &serviceCreationLatencyMeasurement{
selector: measurementutil.NewObjectSelector(),
queue: workerqueue.NewWorkerQueue(serviceCreationLatencyWorkers),
//queue: workerqueue.NewWorkerQueue(serviceCreationLatencyWorkers),
creationTimes: measurementutil.NewObjectTransitionTimes(serviceCreationLatencyName),
svcBackends: measurementutil.NewSvcBackends(serviceCreationLatencyName),
pingCheckers: checker.NewMap(),
}
}

type serviceCreationLatencyMeasurement struct {
selector *measurementutil.ObjectSelector
waitTimeout time.Duration
stopCh chan struct{}
isRunning bool
queue workerqueue.Interface
client clientset.Interface
creationTimes *measurementutil.ObjectTransitionTimes
pingCheckers checker.Map
selector *measurementutil.ObjectSelector
waitTimeout time.Duration
stopCh chan struct{}
isRunning bool
queue workerqueue.Interface
client clientset.Interface
creationTimes *measurementutil.ObjectTransitionTimes
svcBackends *measurementutil.SvcBackends
pingCheckers checker.Map
lock sync.Mutex
succeedCheckNums int
checkerWorkers int
backendThreshold int
regexString string
regexpObj *regexp.Regexp
}

// Execute executes service startup latency measurement actions.
Expand Down Expand Up @@ -106,6 +128,33 @@ func (s *serviceCreationLatencyMeasurement) Execute(config *measurement.Config)
if err != nil {
return nil, err
}
s.checkerWorkers, err = util.GetIntOrDefault(config.Params, "parallel_checker_workers", serviceCreationLatencyWorkers)
if err != nil {
return nil, err
}
s.queue = workerqueue.NewWorkerQueue(s.checkerWorkers)
s.succeedCheckNums, err = util.GetIntOrDefault(config.Params, "consecutive_succeed_checks", pingChecks)
if err != nil {
return nil, err
}
s.backendThreshold, err = util.GetIntOrDefault(config.Params, "backendThreshold", backendThreshold)
if err != nil {
return nil, err
}
s.regexString, err = util.GetStringOrDefault(config.Params, "regexString", regexString)
if err != nil {
return nil, err
}
s.regexString = "Server address: ([0-9.]+):80"
if s.regexString != "" {
s.regexpObj, err = regexp.Compile(s.regexString)
if err != nil {
return nil, err
}
} else {
s.regexpObj = nil
}

return nil, s.start()
case "waitForReady":
return nil, s.waitForReady()
Expand All @@ -123,6 +172,8 @@ func (s *serviceCreationLatencyMeasurement) Dispose() {
close(s.stopCh)
}
s.queue.Stop()
s.lock.Lock()
defer s.lock.Unlock()
s.pingCheckers.Dispose()
}

Expand Down Expand Up @@ -181,6 +232,22 @@ var serviceCreationTransitions = map[string]measurementutil.Transition{
From: phaseName(creatingPhase, corev1.ServiceTypeClusterIP),
To: phaseName(reachabilityPhase, corev1.ServiceTypeClusterIP),
},
"create_to_startchecking_clusterip": {
From: phaseName(creatingPhase, corev1.ServiceTypeClusterIP),
To: phaseName(startCheckingPhase, corev1.ServiceTypeClusterIP),
},
"startchecking_to_consecutivestart_clusterip": {
From: phaseName(startCheckingPhase, corev1.ServiceTypeClusterIP),
To: phaseName(consecutiveSuccCheckStartPhase, corev1.ServiceTypeClusterIP),
},
"startchecking_to_available_clusterip": {
From: phaseName(startCheckingPhase, corev1.ServiceTypeClusterIP),
To: phaseName(reachabilityPhase, corev1.ServiceTypeClusterIP),
},
"consecutivestart_to_available_clusterip": {
From: phaseName(consecutiveSuccCheckStartPhase, corev1.ServiceTypeClusterIP),
To: phaseName(reachabilityPhase, corev1.ServiceTypeClusterIP),
},
"create_to_available_nodeport": {
From: phaseName(creatingPhase, corev1.ServiceTypeNodePort),
To: phaseName(reachabilityPhase, corev1.ServiceTypeNodePort),
Expand All @@ -200,6 +267,8 @@ var serviceCreationTransitions = map[string]measurementutil.Transition{
}

func (s *serviceCreationLatencyMeasurement) gather(identifier string) ([]measurement.Summary, error) {
var summaries []measurement.Summary

klog.V(2).Infof("%s: gathering service created latency measurement...", s)
if !s.isRunning {
return nil, fmt.Errorf("metric %s has not been started", s)
Expand All @@ -213,7 +282,32 @@ func (s *serviceCreationLatencyMeasurement) gather(identifier string) ([]measure
return nil, err
}
summary := measurement.CreateSummary(fmt.Sprintf("%s_%s", serviceCreationLatencyName, identifier), "json", content)
return []measurement.Summary{summary}, nil
summaries = append(summaries, summary)

if s.backendThreshold != 0 {
svcBackendNum := s.svcBackends.CalculateBackendNum()
content, err = util.PrettyPrintJSON(svcBackendNum)
if err != nil {
return nil, err
}
summary = measurement.CreateSummary(fmt.Sprintf("%s_%s_BackendNums", serviceCreationLatencyName, identifier), "json", content)
summaries = append(summaries, summary)

if len(svcBackendNum) == 0 {
err = errors.NewMetricViolationError(
"service creation latency",
fmt.Sprintf("%s_%s can not get any backends following pattern %s", serviceCreationLatencyName, identifier, s.regexString))
}
for svc, num := range svcBackendNum {
if num < s.backendThreshold {
klog.Errorf("only found %d backends for svc %s, expected at least %d backends", num, svc, s.backendThreshold)
err = errors.NewMetricViolationError(
"service creation latency",
fmt.Sprintf("some services can not get at least %d backends", s.backendThreshold))
}
}
}
return summaries, err
}

func (s *serviceCreationLatencyMeasurement) handleObject(oldObj, newObj interface{}) {
Expand Down Expand Up @@ -257,6 +351,8 @@ func (s *serviceCreationLatencyMeasurement) deleteObject(svc *corev1.Service) er
if err != nil {
return fmt.Errorf("meta key created error: %v", err)
}
s.lock.Lock()
defer s.lock.Unlock()
s.pingCheckers.DeleteAndStop(key)
return nil
}
Expand Down Expand Up @@ -284,12 +380,17 @@ func (s *serviceCreationLatencyMeasurement) updateObject(svc *corev1.Service) er
s.creationTimes.Set(key, phaseName(ipAssigningPhase, svc.Spec.Type), time.Now())
}
pc := &pingChecker{
callerName: s.String(),
svc: svc,
creationTimes: s.creationTimes,
stopCh: make(chan struct{}),
callerName: s.String(),
svc: svc,
creationTimes: s.creationTimes,
svcBackends: s.svcBackends,
succeedCheckNums: s.succeedCheckNums,
regexpObj: s.regexpObj,
stopCh: make(chan struct{}),
}
pc.run()
s.lock.Lock()
defer s.lock.Unlock()
s.pingCheckers.Add(key, pc)

return nil
Expand All @@ -300,10 +401,13 @@ func phaseName(phase string, serviceType corev1.ServiceType) string {
}

type pingChecker struct {
callerName string
svc *corev1.Service
creationTimes *measurementutil.ObjectTransitionTimes
stopCh chan struct{}
callerName string
svc *corev1.Service
creationTimes *measurementutil.ObjectTransitionTimes
svcBackends *measurementutil.SvcBackends
stopCh chan struct{}
succeedCheckNums int
regexpObj *regexp.Regexp
}

func (p *pingChecker) run() {
Expand All @@ -329,25 +433,44 @@ func (p *pingChecker) run() {
time.Sleep(pingBackoff)
continue
}
if _, exists := p.creationTimes.Get(key, phaseName(startCheckingPhase, p.svc.Spec.Type)); !exists {
p.creationTimes.Set(key, phaseName(startCheckingPhase, p.svc.Spec.Type), time.Now())
}
if success == 0 {
p.creationTimes.Set(key, phaseName(consecutiveSuccCheckStartPhase, p.svc.Spec.Type), time.Now())
}
msg := ""
cmd := ""
switch p.svc.Spec.Type {
case corev1.ServiceTypeClusterIP:
cmd := fmt.Sprintf("curl %s:%d", p.svc.Spec.ClusterIP, p.svc.Spec.Ports[0].Port)
_, err = execservice.RunCommand(pod, cmd)
// curl parameter is https://www.mit.edu/afs.new/sipb/user/ssen/src/curl-7.11.1/docs/curl.html
// we use 3 as the value of -m, instead of the default timeout value 120s, to make the service creation time more precise
cmd = fmt.Sprintf("curl -m 3 -s -S %s:%d", p.svc.Spec.ClusterIP, p.svc.Spec.Ports[0].Port)
msg, err = execservice.RunCommand(pod, cmd)
case corev1.ServiceTypeNodePort:
cmd := fmt.Sprintf("curl %s:%d", pod.Status.HostIP, p.svc.Spec.Ports[0].NodePort)
_, err = execservice.RunCommand(pod, cmd)
cmd = fmt.Sprintf("curl -m 3 -s -S %s:%d", pod.Status.HostIP, p.svc.Spec.Ports[0].NodePort)
msg, err = execservice.RunCommand(pod, cmd)
case corev1.ServiceTypeLoadBalancer:
cmd := fmt.Sprintf("curl %s:%d", p.svc.Status.LoadBalancer.Ingress[0].IP, p.svc.Spec.Ports[0].Port)
_, err = execservice.RunCommand(pod, cmd)
cmd = fmt.Sprintf("curl -m 3 -s -S %s:%d", p.svc.Status.LoadBalancer.Ingress[0].IP, p.svc.Spec.Ports[0].Port)
msg, err = execservice.RunCommand(pod, cmd)
}
if err != nil {
klog.V(2).Infof("cmd %v in pod %v is error: %v", cmd, pod.Name, msg)
success = 0
time.Sleep(pingBackoff)
continue
}
if p.regexpObj != nil {
ip := p.regexpObj.FindStringSubmatch(msg)
// [luwang-vmware] will think a more generic method to filter the user expected value
if len(ip) >= 2 {
p.svcBackends.Set(p.svc.Spec.ClusterIP, ip[1])
}
}
success++
if success == pingChecks {
if success == p.succeedCheckNums {
p.creationTimes.Set(key, phaseName(reachabilityPhase, p.svc.Spec.Type), time.Now())
klog.V(2).Infof("%v succeed to check", key)
}
}
}
Expand Down
30 changes: 24 additions & 6 deletions clusterloader2/pkg/measurement/common/system_pod_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,30 @@ func getPodMetrics(config *measurement.Config) (*systemPodsMetrics, error) {
return extractMetrics(lst), nil
}

func getPodList(client kubernetes.Interface) (*v1.PodList, error) {
//func getPodList(client kubernetes.Interface) (*v1.PodList, error) {
func getPodList(client kubernetes.Interface) ([]v1.Pod, error) {
lst, err := client.CoreV1().Pods(systemNamespace).List(context.TODO(), metav1.ListOptions{
ResourceVersion: "0", // to read from cache
})
if err != nil {
return nil, err
}
return lst, nil
//return lst, nil
pods_lst := lst.Items

ns_lst := []string{ "avi-system", "tanzu-system", "tkg-system", "tkg-system-public"}
for _, ns := range ns_lst {
lst_t, err := client.CoreV1().Pods(ns).List(context.TODO(), metav1.ListOptions{
ResourceVersion: "0", // to read from cache
})
if err != nil {
klog.V(2).Info("failed to collect pod metrics in ns %v", ns)
continue
}
pods_lst = append(pods_lst,lst_t.Items...)
}

return pods_lst, nil
}

func subtractInitialRestartCounts(metrics *systemPodsMetrics, initMetrics *systemPodsMetrics) {
Expand Down Expand Up @@ -230,11 +246,13 @@ func getThresholdOverrides(config *measurement.Config) (map[string]int, error) {
return parsed, nil
}

func extractMetrics(lst *v1.PodList) *systemPodsMetrics {
//func extractMetrics(lst *v1.PodList) *systemPodsMetrics {
func extractMetrics(lst []v1.Pod) *systemPodsMetrics {
metrics := systemPodsMetrics{
Pods: []podMetrics{},
}
for _, pod := range lst.Items {
Pods: []podMetrics{},
}
// for _, pod := range lst.Items {
for _, pod := range lst {
podMetrics := podMetrics{
Containers: []containerMetrics{},
Name: pod.Name,
Expand Down
Loading