Skip to content

Commit

Permalink
test(outputs.kafka): Rework integration test to use bitnami/kafka
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan committed Oct 23, 2023
1 parent 171dccb commit aa6ca16
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 58 deletions.
3 changes: 1 addition & 2 deletions plugins/outputs/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,6 @@ func (k *Kafka) Init() error {
return err
}

k.saramaConfig = config

// Legacy support ssl config
if k.Certificate != "" {
k.TLSCert = k.Certificate
Expand All @@ -151,6 +149,7 @@ func (k *Kafka) Init() error {
}
config.Net.Proxy.Dialer = dialer
}
k.saramaConfig = config

return nil
}
Expand Down
84 changes: 28 additions & 56 deletions plugins/outputs/kafka/kafka_test.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package kafka

import (
"context"
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/require"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"

"github.com/influxdata/telegraf"
Expand All @@ -27,73 +25,47 @@ func TestConnectAndWriteIntegration(t *testing.T) {
t.Skip("Skipping integration test in short mode")
}

ctx := context.Background()
networkName := "kafka-test-network"
net, err := testcontainers.GenericNetwork(ctx, testcontainers.GenericNetworkRequest{
NetworkRequest: testcontainers.NetworkRequest{
Name: networkName,
Attachable: true,
CheckDuplicate: true,
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, net.Remove(ctx), "terminating network failed")
}()

zookeeper := testutil.Container{
Image: "wurstmeister/zookeeper",
ExposedPorts: []string{"2181:2181"},
Networks: []string{networkName},
WaitingFor: wait.ForLog("binding to port"),
Name: "telegraf-test-zookeeper",
}
err = zookeeper.Start()
require.NoError(t, err, "failed to start container")
defer zookeeper.Terminate()

// Start the container as broker AND controller
container := testutil.Container{
Image: "wurstmeister/kafka",
ExposedPorts: []string{"9092:9092"},
Image: "bitnami/kafka",
Hostname: "localhost", // required to be able to resolve the name
ExposedPorts: []string{"9092:9092", "9093:9093"},
Env: map[string]string{
"KAFKA_ADVERTISED_HOST_NAME": "localhost",
"KAFKA_ADVERTISED_PORT": "9092",
"KAFKA_ZOOKEEPER_CONNECT": fmt.Sprintf("telegraf-test-zookeeper:%s", zookeeper.Ports["2181"]),
"KAFKA_CREATE_TOPICS": "Test:1:1",
"KAFKA_CFG_NODE_ID": "0",
"KAFKA_CFG_PROCESS_ROLES": "controller,broker",
"KAFKA_CFG_LISTENERS": "PLAINTEXT://:9092,CONTROLLER://:9093",
"KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP": "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT",
"KAFKA_CFG_CONTROLLER_QUORUM_VOTERS": "0@localhost:9093",
"KAFKA_CFG_CONTROLLER_LISTENER_NAMES": "CONTROLLER",
},
Networks: []string{networkName},
WaitingFor: wait.ForLog("Log loaded for partition Test-0 with initial high watermark 0"),
WaitingFor: wait.ForAll(
wait.ForListeningPort(nat.Port("9092")),
wait.ForLog("Kafka Server started"),
),
}
err = container.Start()
require.NoError(t, err, "failed to start container")
require.NoError(t, container.Start(), "failed to start container")
defer container.Terminate()

brokers := []string{
fmt.Sprintf("%s:%s", container.Address, container.Ports["9092"]),
}

s := &influx.Serializer{}
require.NoError(t, s.Init())

k := &Kafka{
Brokers: brokers,
// Setup the plugin
plugin := &Kafka{
Brokers: []string{container.Address + ":" + container.Ports["9092"]},
Topic: "Test",
Log: testutil.Logger{},
serializer: s,
producerFunc: sarama.NewSyncProducer,
}

// Setup the metric serializer
s := &influx.Serializer{}
require.NoError(t, s.Init())
plugin.SetSerializer(s)

// Verify that we can connect to the Kafka broker
err = k.Init()
require.NoError(t, err)
err = k.Connect()
require.NoError(t, err)
require.NoError(t, plugin.Init())
require.NoError(t, plugin.Connect())
defer plugin.Close()

// Verify that we can successfully write data to the kafka broker
err = k.Write(testutil.MockMetrics())
require.NoError(t, err)
err = k.Close()
require.NoError(t, err)
require.NoError(t, plugin.Write(testutil.MockMetrics()))
}

func TestTopicSuffixes(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions testutil/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Container struct {
Cmd []string
Image string
Name string
Hostname string
Networks []string
WaitingFor wait.Strategy

Expand Down Expand Up @@ -60,6 +61,7 @@ func (c *Container) Start() error {
Cmd: c.Cmd,
Image: c.Image,
Name: c.Name,
Hostname: c.Hostname,
Networks: c.Networks,
WaitingFor: c.WaitingFor,
},
Expand Down

0 comments on commit aa6ca16

Please sign in to comment.