Skip to content

Commit

Permalink
Merge pull request #1704 from nak0f/e2e-test-configcheck
Browse files Browse the repository at this point in the history
e2e test for config checks
  • Loading branch information
OverOrion authored Apr 2, 2024
2 parents 54a2dd0 + 65cf9e9 commit 4e4283f
Showing 1 changed file with 184 additions and 0 deletions.
184 changes: 184 additions & 0 deletions e2e/fluentd-aggregator/fluentd_aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -217,3 +218,186 @@ func TestFluentdAggregator_MultiWorker(t *testing.T) {
common.RequireNoError(t, rbacv1.AddToScheme(o.Scheme))
})
}
func TestFluentdAggregator_ConfigChecks(t *testing.T) {
common.Initialize(t)
ns := "testing-2"
releaseNameOverride := "e2e"
outputName := "test-output"
flowName := "test-flow"
common.WithCluster("fluentd-2", t, func(t *testing.T, c common.Cluster) {
setup.LoggingOperator(t, c, setup.LoggingOperatorOptionFunc(func(options *setup.LoggingOperatorOptions) {
options.Namespace = ns
options.NameOverride = releaseNameOverride
}))

ctx := context.Background()

logging := v1beta1.Logging{
ObjectMeta: metav1.ObjectMeta{
Name: "fluentd-aggregator-configchecks-test",
Namespace: ns,
},
Spec: v1beta1.LoggingSpec{
EnableRecreateWorkloadOnImmutableFieldChange: true,
ControlNamespace: ns,
FluentbitSpec: &v1beta1.FluentbitSpec{
Network: &v1beta1.FluentbitNetwork{
Keepalive: utils.BoolPointer(false),
},
},
FluentdSpec: &v1beta1.FluentdSpec{
Image: v1beta1.ImageSpec{
Tag: "v1.16-base",
},
Resources: corev1.ResourceRequirements{
Limits: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("500m"),
corev1.ResourceMemory: resource.MustParse("200M"),
},
Requests: corev1.ResourceList{
corev1.ResourceCPU: resource.MustParse("250m"),
corev1.ResourceMemory: resource.MustParse("50M"),
},
},
BufferVolumeMetrics: &v1beta1.Metrics{},
Scaling: &v1beta1.FluentdScaling{
Replicas: 1,
Drain: v1beta1.FluentdDrainConfig{
Enabled: true,
},
},
Workers: 1,
},
},
}
common.RequireNoError(t, c.GetClient().Create(ctx, &logging))
output := v1beta1.Output{
ObjectMeta: metav1.ObjectMeta{
Name: outputName,
Namespace: ns,
},
Spec: v1beta1.OutputSpec{
FileOutput: &output.FileOutputConfig{
Path: "/tmp/logs/${tag}/%Y/%m/%d.%H.%M",
Append: true,
Buffer: &output.Buffer{
Type: "file",
Timekey: "1m",
TimekeyWait: "10s",
},
},
},
}

producerLabels := map[string]string{
"my-unique-label": "log-producer",
}

common.RequireNoError(t, c.GetClient().Create(ctx, &output))
flow := v1beta1.Flow{
ObjectMeta: metav1.ObjectMeta{
Name: flowName,
Namespace: ns,
},
Spec: v1beta1.FlowSpec{
Match: []v1beta1.Match{
{
Select: &v1beta1.Select{
Labels: producerLabels,
},
},
},
LocalOutputRefs: []string{output.Name},
},
}
common.RequireNoError(t, c.GetClient().Create(ctx, &flow))

aggregatorLabels := map[string]string{
"app.kubernetes.io/name": "fluentd",
"app.kubernetes.io/component": "fluentd",
}
operatorLabels := map[string]string{
"app.kubernetes.io/name": releaseNameOverride,
}

go setup.LogProducer(t, c.GetClient(), setup.LogProducerOptionFunc(func(options *setup.LogProducerOptions) {
options.Namespace = ns
options.Labels = producerLabels
}))

require.Eventually(t, func() bool {
if operatorRunning := cond.AnyPodShouldBeRunning(t, c.GetClient(), client.MatchingLabels(operatorLabels))(); !operatorRunning {
t.Log("waiting for the operator")
return false
}
if producerRunning := cond.AnyPodShouldBeRunning(t, c.GetClient(), client.MatchingLabels(producerLabels))(); !producerRunning {
t.Log("waiting for the producer")
return false
}
if aggregatorRunning := cond.AnyPodShouldBeRunning(t, c.GetClient(), client.MatchingLabels(aggregatorLabels)); !aggregatorRunning() {
t.Log("waiting for the aggregator")
return false
}

return logging.Status.ProblemsCount == 0
}, 5*time.Minute, 3*time.Second)

t.Logf("Breaking File Output with an invalid config")
patch := client.MergeFrom(output.DeepCopy())
output.Spec.FileOutput.Path = "/tmp/zzz"
common.RequireNoError(t, c.GetClient().Patch(ctx, &output, patch))
require.Eventually(t, func() bool {
common.RequireNoError(t, c.GetClient().Get(ctx, utils.ObjectKeyFromObjectMeta(&logging), &logging))
if logging.Status.ProblemsCount > 0 {
for _, problem := range logging.Status.Problems {
match, err := regexp.MatchString(`^Configuration with checksum (.+) has failed. .*`, problem)
common.RequireNoError(t, err)
if match {
t.Logf("Found the problem in Logging status: %v", logging.Status)
return true
}
}
}
t.Logf("Waiting for the problem to appear in Logging status: %v", logging.Status.Problems)
return false
}, 5*time.Minute, 3*time.Second)

t.Logf("Fixing Output")
patch = client.MergeFrom(output.DeepCopy())
output.Spec.FileOutput.Path = "/tmp/logs/${tag}/%Y/%m/%d.%H.%M"
common.RequireNoError(t, c.GetClient().Patch(ctx, &output, patch))
require.Eventually(t, func() bool {
common.RequireNoError(t, c.GetClient().Get(ctx, utils.ObjectKeyFromObjectMeta(&logging), &logging))
if logging.Status.ProblemsCount > 0 {
for _, problem := range logging.Status.Problems {
match, err := regexp.MatchString(`^Configuration with checksum (.+) has failed. .*`, problem)
common.RequireNoError(t, err)
if match {
t.Logf("Waiting for the problem to be cleared in Logging status: %v", logging.Status.Problems)
return false
}
}
}
t.Logf("Problem cleared in Logging status: %v", logging.Status)
return true
}, 5*time.Minute, 3*time.Second)
}, func(t *testing.T, c common.Cluster) error {
path := filepath.Join(TestTempDir, fmt.Sprintf("cluster-%s.log", t.Name()))
t.Logf("Printing cluster logs to %s", path)
return c.PrintLogs(common.PrintLogConfig{
Namespaces: []string{ns, "default"},
FilePath: path,
Limit: 100 * 1000,
})
}, func(o *cluster.Options) {
if o.Scheme == nil {
o.Scheme = runtime.NewScheme()
}
common.RequireNoError(t, v1beta1.AddToScheme(o.Scheme))
common.RequireNoError(t, apiextensionsv1.AddToScheme(o.Scheme))
common.RequireNoError(t, appsv1.AddToScheme(o.Scheme))
common.RequireNoError(t, batchv1.AddToScheme(o.Scheme))
common.RequireNoError(t, corev1.AddToScheme(o.Scheme))
common.RequireNoError(t, rbacv1.AddToScheme(o.Scheme))
})
}

0 comments on commit 4e4283f

Please sign in to comment.