From aa6ca1656283065aea8f64ab451e111deb3b9d11 Mon Sep 17 00:00:00 2001 From: Sven Rebhan Date: Mon, 23 Oct 2023 18:07:53 +0200 Subject: [PATCH] test(outputs.kafka): Rework integration test to use bitnami/kafka --- plugins/outputs/kafka/kafka.go | 3 +- plugins/outputs/kafka/kafka_test.go | 84 ++++++++++------------------- testutil/container.go | 2 + 3 files changed, 31 insertions(+), 58 deletions(-) diff --git a/plugins/outputs/kafka/kafka.go b/plugins/outputs/kafka/kafka.go index 89a653794fbc4..e772ada98df17 100644 --- a/plugins/outputs/kafka/kafka.go +++ b/plugins/outputs/kafka/kafka.go @@ -133,8 +133,6 @@ func (k *Kafka) Init() error { return err } - k.saramaConfig = config - // Legacy support ssl config if k.Certificate != "" { k.TLSCert = k.Certificate @@ -151,6 +149,7 @@ func (k *Kafka) Init() error { } config.Net.Proxy.Dialer = dialer } + k.saramaConfig = config return nil } diff --git a/plugins/outputs/kafka/kafka_test.go b/plugins/outputs/kafka/kafka_test.go index 821153db5d61b..858b45308dd45 100644 --- a/plugins/outputs/kafka/kafka_test.go +++ b/plugins/outputs/kafka/kafka_test.go @@ -1,14 +1,12 @@ package kafka import ( - "context" - "fmt" "testing" "time" "github.com/Shopify/sarama" + "github.com/docker/go-connections/nat" "github.com/stretchr/testify/require" - "github.com/testcontainers/testcontainers-go" "github.com/testcontainers/testcontainers-go/wait" "github.com/influxdata/telegraf" @@ -27,73 +25,47 @@ func TestConnectAndWriteIntegration(t *testing.T) { t.Skip("Skipping integration test in short mode") } - ctx := context.Background() - networkName := "kafka-test-network" - net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{ - NetworkRequest: testcontainers.NetworkRequest{ - Name: networkName, - Attachable: true, - CheckDuplicate: true, - }, - }) - require.NoError(t, err) - defer func() { - require.NoError(t, net.Remove(ctx), "terminating network failed") - }() - - zookeeper := testutil.Container{ - Image: "wurstmeister/zookeeper", - ExposedPorts: []string{"2181:2181"}, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("binding to port"), - Name: "telegraf-test-zookeeper", - } - err = zookeeper.Start() - require.NoError(t, err, "failed to start container") - defer zookeeper.Terminate() - + // Start the container as broker AND controller container := testutil.Container{ - Image: "wurstmeister/kafka", - ExposedPorts: []string{"9092:9092"}, + Image: "bitnami/kafka", + Hostname: "localhost", // required to be able to resolve the name + ExposedPorts: []string{"9092:9092", "9093:9093"}, Env: map[string]string{ - "KAFKA_ADVERTISED_HOST_NAME": "localhost", - "KAFKA_ADVERTISED_PORT": "9092", - "KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("telegraf-test-zookeeper:%s", zookeeper.Ports["2181"]), - "KAFKA_CREATE_TOPICS": "Test:1:1", + "KAFKA_CFG_NODE_ID": "0", + "KAFKA_CFG_PROCESS_ROLES": "controller,broker", + "KAFKA_CFG_LISTENERS": "PLAINTEXT://:9092,CONTROLLER://:9093", + "KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT", + "KAFKA_CFG_CONTROLLER_QUORUM_VOTERS": "0@localhost:9093", + "KAFKA_CFG_CONTROLLER_LISTENER_NAMES": "CONTROLLER", }, - Networks: []string{networkName}, - WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"), + WaitingFor: wait.ForAll( + wait.ForListeningPort(nat.Port("9092")), + wait.ForLog("Kafka Server started"), + ), } - err = container.Start() - require.NoError(t, err, "failed to start container") + require.NoError(t, container.Start(), "failed to start container") defer container.Terminate() - brokers := []string{ - fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]), - } - - s := &influx.Serializer{} - require.NoError(t, s.Init()) - - k := &Kafka{ - Brokers: brokers, + // Setup the plugin + plugin := &Kafka{ + Brokers: []string{container.Address + ":" + container.Ports["9092"]}, Topic: "Test", Log: testutil.Logger{}, - serializer: s, producerFunc: sarama.NewSyncProducer, } + // Setup the metric serializer + s := &influx.Serializer{} + require.NoError(t, s.Init()) + plugin.SetSerializer(s) + // Verify that we can connect to the Kafka broker - err = k.Init() - require.NoError(t, err) - err = k.Connect() - require.NoError(t, err) + require.NoError(t, plugin.Init()) + require.NoError(t, plugin.Connect()) + defer plugin.Close() // Verify that we can successfully write data to the kafka broker - err = k.Write(testutil.MockMetrics()) - require.NoError(t, err) - err = k.Close() - require.NoError(t, err) + require.NoError(t, plugin.Write(testutil.MockMetrics())) } func TestTopicSuffixes(t *testing.T) { diff --git a/testutil/container.go b/testutil/container.go index f80ea5a04a8bf..994df82ff7b50 100644 --- a/testutil/container.go +++ b/testutil/container.go @@ -31,6 +31,7 @@ type Container struct { Cmd []string Image string Name string + Hostname string Networks []string WaitingFor wait.Strategy @@ -60,6 +61,7 @@ func (c *Container) Start() error { Cmd: c.Cmd, Image: c.Image, Name: c.Name, + Hostname: c.Hostname, Networks: c.Networks, WaitingFor: c.WaitingFor, },