Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: load test #1240

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[*.ts]
indent_style = space
indent_size = 2
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ vendor
config-db
scraped/

tests/log.txt

.config-db
ginkgo.report

Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ test: manifests generate fmt vet envtest ## Run tests.
test-prod: manifests generate fmt vet envtest ## Run tests.
$(MAKE) gotest-prod

test-load: envtest ## Run tests.
kubectl delete events --all -n testns
kubectl delete deployments --all -n testns
kubectl delete pods --all -n testns
$(MAKE) gotest-load

.PHONY: gotest
gotest:
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test ./... -coverprofile cover.out
Expand All @@ -71,6 +77,10 @@ gotest:
gotest-prod:
KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -tags rustdiffgen ./... -coverprofile cover.out

.PHONY: gotest-load
gotest-load:
make -C fixtures/load k6
LOAD_TEST=1 KUBEBUILDER_ASSETS="$(shell $(ENVTEST) use $(ENVTEST_K8S_VERSION) --bin-dir $(LOCALBIN) -p path)" go test -v ./tests -coverprofile cover.out

.PHONY: env
env: envtest ## Run tests.
Expand Down
4 changes: 2 additions & 2 deletions cmd/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,12 @@ func run(ctx dutyContext.Context, args []string) error {
utilruntime.Must(v1.AddToScheme(scheme))

registerJobs(ctx, args)
scrapers.StartEventListener(ctx)
scrapers.StartEventListener(dutyCtx)

go serve(dutyCtx)
go tableUpdatesHandler(dutyCtx)

return launchKopper(ctx)
return launchKopper(dutyCtx)
}

func launchKopper(ctx dutyContext.Context) error {
Expand Down
3 changes: 2 additions & 1 deletion fixtures/load/Makefile
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
.PHONY:
k6:
go install go.k6.io/xk6/cmd/xk6@latest
xk6 build --with github.com/grafana/xk6-kubernetes
xk6 build --with github.com/grafana/xk6-kubernetes --with github.com/avitalique/xk6-file@latest

.PHONY:
run: k6
kubectl delete pods --all -n testns
./k6 run load.ts --insecure-skip-tls-verify
66 changes: 60 additions & 6 deletions fixtures/load/load.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ import { Kubernetes } from 'k6/x/kubernetes';
import k6 from 'k6';
import encoding from 'k6/encoding'
import http from 'k6/http';
import file from 'k6/x/file';

export const options = {
thresholds: {
http_req_failed: ['rate<0.01'],
//http_req_failed: ['rate<0.01'],
http_req_duration: ['p(99)<1000'],
},
scenarios: {
Expand Down Expand Up @@ -122,13 +123,54 @@ const podSpec = {
}
}

const namespaceSpec = {
apiVersion: "v1",
kind: "Namespace",
metadata: {
name: ns,
}
}

const deploymentSpec = {
apiVersion: "apps/v1",
kind: "Deployment",
metadata: {
name: "nginx",
namespace: ns,
},
spec: {
replicas: 3,
selector: {
matchLabels: {
app: "nginx"
}
},
template: {
metadata: {
labels: {
app: "nginx"
}
},
spec: {
containers: [
{
name: "nginx",
image: "nginx:alpine",
}
]
}
}
}
};

let count = 2
export default function () {
let count = 10
export default function() {
kubernetes = new Kubernetes();
console.log(`Connected to ${kubernetes.kubernetes.config.host}`)


kubernetes.apply(JSON.stringify(namespaceSpec))

// Create 200 pods
for (let i = 0; i < count; i++) {
const podName = `podinfo-${i}`;
Expand All @@ -152,14 +194,16 @@ export default function () {

// Crash 20 random pods over 1 minute
const interval = 3; // seconds between crashes
const podsToCrash = 1;
const podsToCrash = 2;

for (let i = 0; i < podsToCrash; i++) {
const randomPodIndex = Math.floor(Math.random() * count);
const podName = `podinfo-${randomPodIndex}`;


console.log(`Crashing pod: ${podName}`);
// Write this to file
console.log(`Crashing pod: ${podName} at ${new Date().toLocaleString()}`);
file.appendString('log.txt', `${podName},crash,${new Date().toISOString()}\n`)

try {
let response = proxyGet(kubernetes.get("Pod", podName, ns), "panic", 9898)
Expand All @@ -176,7 +220,17 @@ export default function () {
// List all pods to verify
const pods = kubernetes.list("Pod", ns);
console.log(`${pods.length} Pods found:`);
pods.map(function (pod) {
pods.map(function(pod) {
console.log(` ${pod.metadata.name} ${pod.status.phase}: restarts=${pod.status.containerStatuses[0].restartCount}`);
});

// Create deployment to scale up and down
console.log(`Creating nginx deployment`);
kubernetes.apply(JSON.stringify(deploymentSpec))

k6.sleep(5);
const deployment1Replica = JSON.parse(JSON.stringify(deploymentSpec));
deployment1Replica.spec.replicas = 1;
file.appendString('log.txt', `${deploymentSpec.metadata.name},scaledown,${new Date().toISOString()}\n`)
kubernetes.apply(JSON.stringify(deployment1Replica))
}
5 changes: 4 additions & 1 deletion scrapers/cron.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func Stop() {
scrapeJobScheduler.Stop()
}

func SyncScrapeConfigs(sc context.Context) {
func InitSemaphoreWeights(sc context.Context) {
if globalScraperSempahore == nil {
globalScraperSempahore = semaphore.NewWeighted(int64(sc.Properties().Int("scraper.concurrency", ScraperConcurrency)))
}
Expand All @@ -71,7 +71,10 @@ func SyncScrapeConfigs(sc context.Context) {
"trivy": semaphore.NewWeighted(int64(sc.Properties().Int("scraper.trivy.concurrency", 1))),
}
}
}

func SyncScrapeConfigs(sc context.Context) {
InitSemaphoreWeights(sc)
DefaultSchedule = sc.Properties().String("scrapers.default.schedule", DefaultSchedule)
j := &job.Job{
Name: "ConfigScraperSync",
Expand Down
196 changes: 196 additions & 0 deletions tests/load_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package tests

import (
"os"
"os/exec"
"strings"
"testing"
"time"

"github.com/flanksource/commons/logger"
"github.com/flanksource/config-db/api"
v1 "github.com/flanksource/config-db/api/v1"
"github.com/flanksource/config-db/scrapers"
"github.com/flanksource/duty/context"
"github.com/flanksource/duty/models"
"github.com/flanksource/duty/tests/setup"
ginkgo "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
)

func TestLoad(t *testing.T) {
RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t, "Load")
}

var (
DefaultContext context.Context
)

type ChangeTimes []struct {
ChangeType string
CreatedAt time.Time
Name string
Details string
}

var _ = ginkgo.BeforeSuite(func() {
DefaultContext = setup.BeforeSuiteFn()

})

var _ = ginkgo.AfterSuite(setup.AfterSuiteFn)

var _ = ginkgo.Describe("Load Test", ginkgo.Ordered, func() {

var scraperCtx api.ScrapeContext
ginkgo.BeforeAll(func() {
// Skip load test for normal flow
if _, ok := os.LookupEnv("LOAD_TEST"); !ok {
ginkgo.Skip("Skipping load test, env: LOAD_TEST not set")
}

// This is required since duty.Setup uses a Fake Kubernetes Client by default
kubeconfig := clientcmd.RecommendedHomeFile
config, err := clientcmd.BuildConfigFromFlags("", kubeconfig)
if err != nil {
panic(err)
}

clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err)
}
DefaultContext = DefaultContext.WithKubernetes(clientset, config)

scrapeConfig := v1.ScrapeConfig{
Spec: v1.ScraperSpec{
Schedule: "@every 30s",
Kubernetes: []v1.Kubernetes{{
ClusterName: "load-test",
Watch: []v1.KubernetesResourceToWatch{
{ApiVersion: "v1", Kind: "Namespace"},
{ApiVersion: "v1", Kind: "Pod"},
},
}},
},
}
scraperCtx = api.NewScrapeContext(DefaultContext).WithScrapeConfig(&scrapeConfig)

scrapers.InitSemaphoreWeights(scraperCtx.Context)
})
ginkgo.It("should start scrape once", func() {
_, err := scrapers.RunScraper(scraperCtx)
Expect(err).To(BeNil())

var count int64
Expect(scraperCtx.DB().Table("config_items").Where("type LIKE 'Kubernetes::%'").Count(&count).Error).To(BeNil())
Expect(count).ToNot(Equal(int64(0)))
})

ginkgo.It("should start consumer", func() {
_ = models.ConfigChange{}
err := scrapers.SyncScrapeJob(scraperCtx)
Expect(err).To(BeNil())

os.Remove("log.txt")

time.Sleep(15 * time.Second)
logger.Infof("Exec k6")
cmd := exec.Command("../fixtures/load/k6", "run", "../fixtures/load/load.ts", "--insecure-skip-tls-verify")
err = cmd.Run()
if err != nil {
logger.Errorf("Error is %v", err)
panic(err)
}

logger.Infof("End k6")
time.Sleep(2 * time.Minute)

var count int64
Expect(scraperCtx.DB().Table("config_changes").Count(&count).Error).To(BeNil())
Expect(count).ToNot(Equal(int64(0)))

var podinfoChanges ChangeTimes
err = scraperCtx.DB().Raw(`
SELECT cc.change_type, cc.created_at, ci.name FROM config_changes cc
INNER JOIN config_items ci ON cc.config_id = ci.id
WHERE ci.name LIKE 'podinfo%'
`).Scan(&podinfoChanges).Error

Expect(err).To(BeNil())

podinfoChangeDiffs := make(map[string]time.Time)
for _, c := range podinfoChanges {
logger.Infof("Change is %v", c)
if c.ChangeType == v1.ChangeTypeDiff {
podinfoChangeDiffs[c.Name] = c.CreatedAt
}
}

f, err := os.ReadFile("log.txt")
Expect(err).To(BeNil())
lines := strings.Split(string(f), "\n")

k6CrashTime := make(map[string]time.Time)
deployTimes := make(map[string]time.Time)
for _, line := range lines {
if strings.TrimSpace(line) == "" {
continue
}
parts := strings.Split(line, ",")
t, err := time.Parse(time.RFC3339, parts[2])
Expect(err).To(BeNil())

if parts[1] == "crash" {
k6CrashTime[parts[0]] = t
} else if parts[1] == "scaledown" {
deployTimes[parts[0]] = t
}

logger.Infof("N=%s t=%s", parts[0], t)
}

for k, v := range k6CrashTime {
changeLog, ok := podinfoChangeDiffs[k]
if !ok {
panic("not found " + k)
}
td := changeLog.Sub(v)
logger.Infof("Delta for %s is %v", k, td)
Expect(td).To(BeNumerically("<", time.Minute))
}

var nginxChanges ChangeTimes
err = scraperCtx.DB().Raw(`
SELECT cc.change_type, cc.created_at, ci.name FROM config_changes cc
INNER JOIN config_items ci ON cc.config_id = ci.id
WHERE ci.name LIKE 'nginx%'
ORDER BY cc.created_at ASC
`).Scan(&nginxChanges).Error

Expect(err).To(BeNil())

nginxChangeDiffs := make(map[string]time.Time)
nginxCounter := 0
for _, c := range nginxChanges {
logger.Infof("Nginx change is %v", c)
if c.ChangeType == "ScalingReplicaSet" && nginxCounter != 0 {
nginxChangeDiffs[c.Name] = c.CreatedAt
}
nginxCounter += 1
}

for k, v := range deployTimes {
changeLog, ok := nginxChangeDiffs[k]
if !ok {
panic("not found " + k)
}
td := changeLog.Sub(v)
logger.Infof("Delta for %s is %v", k, td)
Expect(td).To(BeNumerically("<", time.Minute))
}
})
})
Loading