From 9c41202c26d0b8c348015bbc1c575b3243823dfb Mon Sep 17 00:00:00 2001 From: Ziwen Ning Date: Sat, 16 Sep 2023 16:16:37 -0700 Subject: [PATCH] ci: add fluentd e2e tests Signed-off-by: Ziwen Ning --- .github/workflows/ci.yaml | 29 +++++++++++++++ Makefile | 4 ++ e2e/awslogs_test.go | 78 ++++++++++++--------------------------- e2e/fluentd_test.go | 67 +++++++++++++++++++++++++++++++++ e2e/main_test.go | 44 ++++++++++++++++++++++ 5 files changed, 167 insertions(+), 55 deletions(-) create mode 100644 e2e/fluentd_test.go diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 8e61aa1..527822a 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -80,6 +80,35 @@ jobs: run: sudo make build - name: test-e2e run: sudo -E make test-e2e-for-awslogs # containerd interaction requires sudo and aws cloudwatch interaction requires passing env vars + e2e-tests-for-fluentd: + strategy: + fail-fast: false + matrix: + go: [ '1.20', '1.21' ] + os: [ ubuntu-latest ] # TODO: Add Windows e2e tests: https://github.com/aws/shim-loggers-for-containerd/issues/68 + name: E2E tests / fluentd / ${{ matrix.os }} / Go ${{ matrix.go }} + runs-on: ${{ matrix.os }} + permissions: + id-token: write + contents: write + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v4 + with: + go-version: ${{ matrix.go }} + cache: false + - name: install and start containerd + shell: bash + run: sudo scripts/install-containerd + - name: start fluentd local endpoint + shell: bash + run: | # Fluentd container is not using root user so need 777 to make it writable. https://docs.github.com/en/actions/using-github-hosted-runners/about-github-hosted-runners#docker-container-filesystem + sudo mkdir -m 777 $GITHUB_WORKSPACE/fluentd-logs + docker run -d -p 24224:24224 -p 24224:24224/udp -v $GITHUB_WORKSPACE/fluentd-logs:/fluentd/log public.ecr.aws/docker/library/fluentd:v1.16-debian-1 + - name: build + run: sudo make build + - name: test-e2e + run: sudo make test-e2e-for-fluentd # containerd interaction requires sudo go-mod-tidy-check: runs-on: ubuntu-latest steps: diff --git a/Makefile b/Makefile index ebe8e58..462b172 100644 --- a/Makefile +++ b/Makefile @@ -35,6 +35,10 @@ test-e2e: test-e2e: go test -timeout 30m ./e2e -test.v -ginkgo.v --binary "$(AWS_CONTAINERD_LOGGERS_BINARY)" --log-driver "awslogs" +.PHONY: test-e2e-for-fluentd +test-e2e-for-fluentd: + go test -timeout 30m ./e2e -test.v -ginkgo.v --binary "$(AWS_CONTAINERD_LOGGERS_BINARY)" --log-driver "fluentd" + .PHONY: coverage coverage: go test -tags unit $(shell go list ./... | grep -v e2e) -coverprofile=test-coverage.out diff --git a/e2e/awslogs_test.go b/e2e/awslogs_test.go index 056dad4..ec7b2c3 100644 --- a/e2e/awslogs_test.go +++ b/e2e/awslogs_test.go @@ -5,15 +5,11 @@ package e2e import ( "context" - "fmt" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs" - "github.com/containerd/containerd" "github.com/containerd/containerd/cio" - "github.com/containerd/containerd/namespaces" - "github.com/containerd/containerd/oci" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" ) @@ -28,31 +24,22 @@ const ( testAwslogsRegion = "us-west-2" testAwsLogsStream = "test-stream" testAwsLogsGroup = "test-shim-logger" - testAwsLogsMessage = "test-e2e-log" ) var testAwslogs = func() { // These tests are run in serial because we only define one log driver instance. - ginkgo.Describe("awslogs shim logger", ginkgo.Serial, func() { //nolint:typecheck + ginkgo.Describe("awslogs shim logger", ginkgo.Serial, func() { var cwClient *cloudwatchlogs.Client ginkgo.BeforeEach(func() { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithRegion(testAwslogsRegion)) gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) cwClient = cloudwatchlogs.NewFromConfig(cfg) - _, err = cwClient.DeleteLogStream(context.TODO(), &cloudwatchlogs.DeleteLogStreamInput{ - LogStreamName: aws.String(testAwsLogsStream), - LogGroupName: aws.String(testAwsLogsGroup), - }) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + cleanupAwslogs(cwClient, testAwsLogsGroup, testAwsLogsStream) }) ginkgo.AfterEach(func() { - _, err := cwClient.DeleteLogStream(context.TODO(), &cloudwatchlogs.DeleteLogStreamInput{ - LogStreamName: aws.String(testAwsLogsStream), - LogGroupName: aws.String(testAwsLogsGroup), - }) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + cleanupAwslogs(cwClient, testAwsLogsGroup, testAwsLogsStream) }) - ginkgo.It("should send logs to awslogs log driver", func() { //nolint:typecheck + ginkgo.It("should send logs to awslogs log driver", func() { args := map[string]string{ logDriverTypeKey: awslogsDriverName, containerIdKey: testContainerId, @@ -63,45 +50,26 @@ var testAwslogs = func() { awslogsStreamKey: testAwsLogsStream, } creator := cio.BinaryIO(*Binary, args) - // Create a new client connected to the containerd daemon - client, err := containerd.New(containerdAddress) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - defer client.Close() - // Create a new context with a customized namespace - ctx := namespaces.WithNamespace(context.Background(), "testAwslogs") - // Pull an image - image, err := client.Pull(ctx, testImage, containerd.WithPullUnpack) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - // Create a new container with the pulled image - container, err := client.NewContainer(ctx, testContainerId, containerd.WithImage(image), - containerd.WithNewSnapshot("test-snapshot", image), containerd.WithNewSpec(oci.WithImageConfig(image), - oci.WithProcessArgs("/bin/sh", "-c", fmt.Sprintf("echo '%s'", testAwsLogsMessage)))) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - defer container.Delete(ctx, containerd.WithSnapshotCleanup) //nolint:errcheck // testing only - // Create a new task from the container and start it - task, err := container.NewTask(ctx, creator) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - defer task.Delete(ctx) //nolint:errcheck // testing only - - err = task.Start(ctx) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + sendTestLogByContainerd(creator, testLog) + validateTestLogsInAwslogs(cwClient, testAwsLogsGroup, testAwsLogsStream, testLog) + }) + }) +} - statusC, err := task.Wait(ctx) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - // Waiting for the task to finish - status := <-statusC - code, _, err := status.Result() - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(code).Should(gomega.Equal(uint32(0))) +func validateTestLogsInAwslogs(client *cloudwatchlogs.Client, logGroupName string, logStreamName string, testLog string) { + cwOutput, err := client.GetLogEvents(context.TODO(), &cloudwatchlogs.GetLogEventsInput{ + LogStreamName: aws.String(logGroupName), + LogGroupName: aws.String(logStreamName), + Limit: aws.Int32(1), + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(*cwOutput.Events[0].Message).Should(gomega.Equal(testLog)) +} - // Validating in AWS logs - cwOutput, err := cwClient.GetLogEvents(context.TODO(), &cloudwatchlogs.GetLogEventsInput{ - LogStreamName: aws.String(testAwsLogsStream), - LogGroupName: aws.String(testAwsLogsGroup), - Limit: aws.Int32(1), - }) - gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) - gomega.Expect(*cwOutput.Events[0].Message).Should(gomega.Equal(testAwsLogsMessage)) - }) +func cleanupAwslogs(client *cloudwatchlogs.Client, logGroupName string, logStreamName string) { + _, err := client.DeleteLogStream(context.TODO(), &cloudwatchlogs.DeleteLogStreamInput{ + LogStreamName: aws.String(logGroupName), + LogGroupName: aws.String(logStreamName), }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) } diff --git a/e2e/fluentd_test.go b/e2e/fluentd_test.go new file mode 100644 index 0000000..ff4124b --- /dev/null +++ b/e2e/fluentd_test.go @@ -0,0 +1,67 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +package e2e + +import ( + "bufio" + "encoding/json" + "os" + "path/filepath" + "strings" + + "github.com/containerd/containerd/cio" + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" +) + +const ( + fluentdLogDirName = "./../fluentd-logs" +) + +var testFluentd = func() { + // These tests are run in serial because we only define one log driver instance. + ginkgo.Describe("fluentd shim logger", ginkgo.Serial, func() { + ginkgo.It("should send logs to fluentd log driver", func() { + args := map[string]string{ + logDriverTypeKey: fluentdDriverName, + containerIdKey: testContainerId, + containerNameKey: testContainerName, + "--verbose": "true", + } + creator := cio.BinaryIO(*Binary, args) + sendTestLogByContainerd(creator, testLog) + validateTestLogsInFluentd(fluentdLogDirName, testLog) + }) + }) +} + +func validateTestLogsInFluentd(dirName string, testLog string) { + var fileName string + err := filepath.Walk(dirName, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if strings.HasPrefix(info.Name(), "data.") && strings.HasSuffix(info.Name(), ".log") && info.Name() != "data.log" { + fileName = path + } + return nil + }) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(fileName).ShouldNot(gomega.Equal("")) + file, err := os.Open(fileName) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer file.Close() + var lastLine string + scanner := bufio.NewScanner(file) + for scanner.Scan() { + lastLine = scanner.Text() + } + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + contentParts := strings.Split(lastLine, "\t") + gomega.Expect(len(contentParts)).Should(gomega.Equal(3)) + var logContent map[string]string + err = json.Unmarshal([]byte(contentParts[2]), &logContent) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(logContent["log"]).Should(gomega.Equal(testLog)) +} diff --git a/e2e/main_test.go b/e2e/main_test.go index 5200c16..c86cdd4 100644 --- a/e2e/main_test.go +++ b/e2e/main_test.go @@ -4,9 +4,15 @@ package e2e import ( + "context" "flag" + "fmt" "testing" + "github.com/containerd/containerd" + "github.com/containerd/containerd/cio" + "github.com/containerd/containerd/namespaces" + "github.com/containerd/containerd/oci" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" ) @@ -15,12 +21,14 @@ const ( // LogDriver options logDriverTypeKey = "--log-driver" awslogsDriverName = "awslogs" + fluentdDriverName = "fluentd" containerIdKey = "--container-id" containerNameKey = "--container-name" testContainerId = "test-container-id" testContainerName = "test-container-name" containerdAddress = "/run/containerd/containerd.sock" testImage = "public.ecr.aws/docker/library/ubuntu:latest" + testLog = "test-e2e-log" ) var ( @@ -36,8 +44,44 @@ func TestShimLoggers(t *testing.T) { if *LogDriver == awslogsDriverName || *LogDriver == "" { testAwslogs() } + if *LogDriver == fluentdDriverName || *LogDriver == "" { + testFluentd() + } }) gomega.RegisterFailHandler(ginkgo.Fail) ginkgo.RunSpecs(t, description) } + +func sendTestLogByContainerd(creator cio.Creator, testLog string) { + // Create a new client connected to the containerd daemon + client, err := containerd.New(containerdAddress) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer client.Close() + // Create a new context with a customized namespace + ctx := namespaces.WithNamespace(context.Background(), "testShimLoggers") + // Pull an image + image, err := client.Pull(ctx, testImage, containerd.WithPullUnpack) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + // Create a new container with the pulled image + container, err := client.NewContainer(ctx, testContainerId+"9", containerd.WithImage(image), + containerd.WithNewSnapshot("test-snapshot-9", image), containerd.WithNewSpec(oci.WithImageConfig(image), + oci.WithProcessArgs("/bin/sh", "-c", fmt.Sprintf("echo '%s'", testLog)))) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer container.Delete(ctx, containerd.WithSnapshotCleanup) //nolint:errcheck // testing only + // Create a new task from the container and start it + task, err := container.NewTask(ctx, creator) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + defer task.Delete(ctx) //nolint:errcheck // testing only + + err = task.Start(ctx) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + + statusC, err := task.Wait(ctx) + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + // Waiting for the task to finish + status := <-statusC + code, _, err := status.Result() + gomega.Expect(err).ShouldNot(gomega.HaveOccurred()) + gomega.Expect(code).Should(gomega.Equal(uint32(0))) +}