diff --git a/component/kafka/integration_test.go b/component/kafka/integration_test.go index 84a6b8bd6..1f77b8081 100644 --- a/component/kafka/integration_test.go +++ b/component/kafka/integration_test.go @@ -30,6 +30,7 @@ const ( failAllRetriesTopic2 = "failAllRetriesTopic2" failAndRetryTopic2 = "failAndRetryTopic2" broker = "127.0.0.1:9093" + groupSuffix = "-group" ) func TestKafkaComponent_Success(t *testing.T) { @@ -257,7 +258,7 @@ func TestGroupConsume_CheckTopicFailsDueToNonExistingTopic(t *testing.T) { return nil } invalidTopicName := "invalid-topic-name" - _, err := New(invalidTopicName, invalidTopicName+"-group", []string{broker}, + _, err := New(invalidTopicName, invalidTopicName+groupSuffix, []string{broker}, []string{invalidTopicName}, processorFunc, sarama.NewConfig(), WithCheckTopic()) require.EqualError(t, err, "topic invalid-topic-name does not exist in broker") } @@ -267,7 +268,7 @@ func TestGroupConsume_CheckTopicFailsDueToNonExistingBroker(t *testing.T) { processorFunc := func(batch Batch) error { return nil } - _, err := New(successTopic2, successTopic2+"-group", []string{"127.0.0.1:9999"}, + _, err := New(successTopic2, successTopic2+groupSuffix, []string{"127.0.0.1:9999"}, []string{successTopic2}, processorFunc, sarama.NewConfig(), WithCheckTopic()) require.NotNil(t, err) require.Contains(t, err.Error(), "failed to create client:") @@ -279,7 +280,7 @@ func newComponent(t *testing.T, name string, retries uint, batchSize uint, proce saramaCfg.Version = sarama.V2_6_0_0 require.NoError(t, err) - cmp, err := New(name, name+"-group", []string{broker}, []string{name}, processorFunc, + cmp, err := New(name, name+groupSuffix, []string{broker}, []string{name}, processorFunc, saramaCfg, WithFailureStrategy(ExitStrategy), WithBatchSize(batchSize), WithBatchTimeout(100*time.Millisecond), WithRetries(retries), WithRetryWait(200*time.Millisecond), WithCommitSync(), WithCheckTopic()) require.NoError(t, err)