Skip to content

Commit

Permalink
Refactored slaves pod configuration and added validation for pod name (
Browse files Browse the repository at this point in the history
…#8)

Signed-off-by: hiteshwani <[email protected]>
  • Loading branch information
hiteshwani committed Sep 13, 2023
1 parent 6481e39 commit 751b14a
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 115 deletions.
10 changes: 5 additions & 5 deletions contrib/executor/jmeterd/pkg/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (r *JMeterDRunner) Run(ctx context.Context, execution testkube.Execution) (
}
}

output.PrintLog(fmt.Sprintf("execution arg before %s", execution.Args))
output.PrintLogf("execution arg before %s", execution.Args)
execution.Args = execution.Args[:len(execution.Args)-1]
output.PrintLogf("execution arg afrer %s", execution.Args)
output.PrintLogf("%s It is a directory test - trying to find file from the last executor argument %s in directory %s", ui.IconWorld, scriptName, path)
Expand Down Expand Up @@ -134,7 +134,7 @@ func (r *JMeterDRunner) Run(ctx context.Context, execution testkube.Execution) (
}
// recreate output directory with wide permissions so JMeter can create report files
if err = os.Mkdir(outputDir, 0777); err != nil {
return *result.Err(errors.Errorf("error creating directory %s: %v", runPath, err)), nil
return *result.Err(errors.Wrapf(err, "error creating directory %s", runPath)), nil
}

jtlPath := filepath.Join(outputDir, "report.jtl")
Expand All @@ -161,17 +161,17 @@ func (r *JMeterDRunner) Run(ctx context.Context, execution testkube.Execution) (

slavesClient, err := slaves.NewClient(execution, r.Params, slavesEnvVariables)
if err != nil {
return *result.WithErrors(errors.Errorf("Getting error while creating slaves client: %v", err)), nil
return *result.WithErrors(errors.Wrap(err, "error creating slaves client")), nil
}

//creating slaves provided in SLAVES_COUNT env variable
slavesNameIpMap, err := slavesClient.CreateSlaves(ctx)
if err != nil {
return *result.WithErrors(errors.Errorf("Getting error while creating slaves nodes: %v", err)), nil
return *result.WithErrors(errors.Wrap(err, "error creating slaves")), nil
}
defer slavesClient.DeleteSlaves(ctx, slavesNameIpMap)

args = append(args, fmt.Sprintf("-R %v", slavesClient.GetSlavesIpString(slavesNameIpMap)))
args = append(args, fmt.Sprintf("-R %v", slaves.GetSlavesIpString(slavesNameIpMap)))

for i := range args {
if args[i] == "<envVars>" {
Expand Down
119 changes: 100 additions & 19 deletions contrib/executor/jmeterd/pkg/slaves/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,22 @@ package slaves

import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/kubeshop/testkube/contrib/executor/jmeterd/pkg/jmeterenv"
"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/envs"
"github.com/kubeshop/testkube/pkg/executor/output"
"github.com/kubeshop/testkube/pkg/k8sclient"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
)

const (
Expand Down Expand Up @@ -50,15 +53,15 @@ func NewClient(execution testkube.Execution, envParams envs.Params, slavesEnvVar
}, nil
}

// creating slaves as per count provided in the SLAVES_CLOUNT env variable.
// CreateSlaves creates slaves as per count provided in the SLAVES_CLOUNT env variable.
// Default SLAVES_COUNT would be 1 if not provided in the env variables
func (client *Client) CreateSlaves(ctx context.Context) (map[string]string, error) {
slavesCount, err := getSlavesCount(client.envVariables[jmeterenv.SlavesCount])
if err != nil {
return nil, errors.Errorf("Getting error while fetching slaves count from env variable SLAVES_COUNT : %v", err)
}

output.PrintLog(fmt.Sprintf("Creating Slaves %v Pods", slavesCount))
output.PrintLogf("Creating Slaves %v Pods", slavesCount)

podIPAddressChan := make(chan map[string]string, slavesCount)
errorChan := make(chan error, slavesCount)
Expand All @@ -84,15 +87,14 @@ func (client *Client) CreateSlaves(ctx context.Context) (map[string]string, erro
return podIPAddresses, nil
}

// created slaves pod and send its ipaddress on the podIPAddressChan channel when pod is in the ready state
// createSlavePod creates a slave pod and sends its IP address on the podIPAddressChan
// channel when the pod is in the ready state.
func (client *Client) createSlavePod(ctx context.Context, currentSlavesCount int, podIPAddressChan chan<- map[string]string, errorChan chan<- error) {

slavePod, err := getSlavePodConfiguration(client.execution.Name, client.execution, client.envVariables, client.envParams)
slavePod, err := client.getSlavePodConfiguration(currentSlavesCount)
if err != nil {
errorChan <- err
return
}
slavePod.Name = fmt.Sprintf("%s-%v-%v", slavePod.Name, currentSlavesCount, client.execution.Id)

p, err := client.clientSet.CoreV1().Pods(client.namespace).Create(ctx, slavePod, metav1.CreateOptions{})
if err != nil {
Expand Down Expand Up @@ -120,6 +122,93 @@ func (client *Client) createSlavePod(ctx context.Context, currentSlavesCount int
podIPAddressChan <- podNameIpMap
}

func (client *Client) getSlavePodConfiguration(currentSlavesCount int) (*v1.Pod, error) {
runnerExecutionStr, err := json.Marshal(client.execution)
if err != nil {
return nil, err
}

podName := ValidateAndGetSlavePodName(client.execution.TestName, client.execution.Id, currentSlavesCount)
if err != nil {
return nil, err
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
InitContainers: []v1.Container{
{
Name: "init",
Image: "kubeshop/testkube-init-executor:1.14.3",
Command: []string{"/bin/runner", string(runnerExecutionStr)},
Env: getSlaveRunnerEnv(client.envParams, client.execution),
ImagePullPolicy: v1.PullIfNotPresent,
VolumeMounts: []v1.VolumeMount{
{
MountPath: "/data",
Name: "data-volume",
},
},
},
},
Containers: []v1.Container{
{
Name: "main",
Image: "kubeshop/testkube-jmeterd-slaves:999.0.0",
Env: getSlaveConfigurationEnv(client.envVariables),
ImagePullPolicy: v1.PullIfNotPresent,
Ports: []v1.ContainerPort{
{
ContainerPort: serverPort,
Name: "server-port",
}, {
ContainerPort: localPort,
Name: "local-port",
},
},
VolumeMounts: []v1.VolumeMount{
{
MountPath: "/data",
Name: "data-volume",
},
},
LivenessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Port: intstr.FromInt(serverPort),
},
},
FailureThreshold: 3,
PeriodSeconds: 5,
SuccessThreshold: 1,
TimeoutSeconds: 1,
},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Port: intstr.FromInt(serverPort),
},
},
FailureThreshold: 3,
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 1,
},
},
},
Volumes: []v1.Volume{
{
Name: "data-volume",
VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}},
},
},
},
}, nil
}

// DeleteSlaves do the cleanup slaves pods after execution of test
func (client *Client) DeleteSlaves(ctx context.Context, slaveNameIpMap map[string]string) error {
for slaveName := range slaveNameIpMap {
output.PrintLog(fmt.Sprintf("Deleting slave %v", slaveName))
Expand All @@ -132,11 +221,3 @@ func (client *Client) DeleteSlaves(ctx context.Context, slaveNameIpMap map[strin
}
return nil
}

func (client *Client) GetSlavesIpString(podNameIpMap map[string]string) string {
podIps := []string{}
for _, ip := range podNameIpMap {
podIps = append(podIps, ip)
}
return strings.Join(podIps, ",")
}
115 changes: 24 additions & 91 deletions contrib/executor/jmeterd/pkg/slaves/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ package slaves

import (
"context"
"encoding/json"
"fmt"
"strconv"
"strings"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/envs"
"github.com/kubeshop/testkube/pkg/executor/output"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"

"github.com/kubeshop/testkube/pkg/api/v1/testkube"
"github.com/kubeshop/testkube/pkg/envs"
"github.com/kubeshop/testkube/pkg/executor/output"
)

const (
Expand Down Expand Up @@ -100,92 +100,6 @@ func getSlaveConfigurationEnv(slaveEnv map[string]testkube.Variable) []v1.EnvVar
return envVars
}

func getSlavePodConfiguration(testName string, runnerExecution testkube.Execution, envVariables map[string]testkube.Variable, envParams envs.Params) (*v1.Pod, error) {
runnerExecutionStr, err := json.Marshal(runnerExecution)
if err != nil {
return nil, err
}

podName := fmt.Sprintf("%s-jmeter-slave", testName)
if err != nil {
return nil, err
}
return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: podName,
},
Spec: v1.PodSpec{
RestartPolicy: v1.RestartPolicyAlways,
InitContainers: []v1.Container{
{
Name: "init",
Image: "kubeshop/testkube-init-executor:1.14.3",
Command: []string{"/bin/runner", string(runnerExecutionStr)},
Env: getSlaveRunnerEnv(envParams, runnerExecution),
ImagePullPolicy: v1.PullIfNotPresent,
VolumeMounts: []v1.VolumeMount{
{
MountPath: "/data",
Name: "data-volume",
},
},
},
},
Containers: []v1.Container{
{
Name: "main",
Image: "kubeshop/testkube-jmeterd-slaves:999.0.0",
Env: getSlaveConfigurationEnv(envVariables),
ImagePullPolicy: v1.PullIfNotPresent,
Ports: []v1.ContainerPort{
{
ContainerPort: serverPort,
Name: "server-port",
}, {
ContainerPort: localPort,
Name: "local-port",
},
},
VolumeMounts: []v1.VolumeMount{
{
MountPath: "/data",
Name: "data-volume",
},
},
LivenessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Port: intstr.FromInt(serverPort),
},
},
FailureThreshold: 3,
PeriodSeconds: 5,
SuccessThreshold: 1,
TimeoutSeconds: 1,
},
ReadinessProbe: &v1.Probe{
ProbeHandler: v1.ProbeHandler{
TCPSocket: &v1.TCPSocketAction{
Port: intstr.FromInt(serverPort),
},
},
FailureThreshold: 3,
InitialDelaySeconds: 10,
PeriodSeconds: 5,
TimeoutSeconds: 1,
},
},
},
Volumes: []v1.Volume{
{
Name: "data-volume",
VolumeSource: v1.VolumeSource{EmptyDir: &v1.EmptyDirVolumeSource{}},
},
},
},
}, nil
}

func isPodReady(ctx context.Context, c kubernetes.Interface, podName, namespace string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
Expand Down Expand Up @@ -218,3 +132,22 @@ func getSlavesCount(count testkube.Variable) (int, error) {
}
return rplicaCount, err
}

func GetSlavesIpString(podNameIpMap map[string]string) string {
podIps := []string{}
for _, ip := range podNameIpMap {
podIps = append(podIps, ip)
}
return strings.Join(podIps, ",")
}

func ValidateAndGetSlavePodName(testName string, executionId string, currentSlaveCount int) string {
slavePodName := fmt.Sprintf("%s-slave-%v-%s", testName, currentSlaveCount, executionId)
if len(slavePodName) > 64 {
//Get first 20 chars from testName name if pod name > 64
shortExecutionName := testName[:20]
slavePodName = fmt.Sprintf("%s-slave-%v-%s", shortExecutionName, currentSlaveCount, executionId)
}
return slavePodName

}

0 comments on commit 751b14a

Please sign in to comment.