From 1023bea80dc7e5673b55ec0b42d04d97e9ca8ed3 Mon Sep 17 00:00:00 2001 From: Sotirios Mantziaris Date: Sat, 16 Mar 2024 21:53:05 +0200 Subject: [PATCH] Fix tests --- client/amqp/integration_test.go | 27 +++++++++++++++-- .../async/kafka/group/integration_test.go | 10 +++++-- .../async/kafka/simple/integration_test.go | 30 +++++++++++++++---- test/amqp/amqp.go | 28 ----------------- test/kafka/kafka.go | 8 ----- 5 files changed, 57 insertions(+), 46 deletions(-) delete mode 100644 test/amqp/amqp.go diff --git a/client/amqp/integration_test.go b/client/amqp/integration_test.go index 5b88a72111..3410668199 100644 --- a/client/amqp/integration_test.go +++ b/client/amqp/integration_test.go @@ -7,7 +7,6 @@ import ( "context" "testing" - testamqp "github.com/beatlabs/patron/test/amqp" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/ext" "github.com/opentracing/opentracing-go/mocktracer" @@ -27,7 +26,7 @@ func TestRun(t *testing.T) { opentracing.SetGlobalTracer(mtr) t.Cleanup(func() { mtr.Reset() }) - require.NoError(t, testamqp.CreateQueue(endpoint, queue)) + require.NoError(t, createQueue(endpoint, queue)) pub, err := New(endpoint) require.NoError(t, err) @@ -72,3 +71,27 @@ func TestRun(t *testing.T) { assert.NoError(t, channel.Close()) assert.NoError(t, conn.Close()) } + +func createQueue(endpoint, queue string) error { + conn, err := amqp.Dial(endpoint) + if err != nil { + return err + } + + channel, err := conn.Channel() + if err != nil { + return err + } + + _, err = channel.QueueDelete(queue, false, false, false) + if err != nil { + return err + } + + _, err = channel.QueueDeclare(queue, true, false, false, false, nil) + if err != nil { + return err + } + + return nil +} diff --git a/component/async/kafka/group/integration_test.go b/component/async/kafka/group/integration_test.go index c12bc07870..b8f313b4fd 100644 --- a/component/async/kafka/group/integration_test.go +++ b/component/async/kafka/group/integration_test.go @@ -72,7 +72,10 @@ func TestGroupConsume(t *testing.T) { messages := make([]*sarama.ProducerMessage, 0, len(sent)) for _, val := range sent { - messages = append(messages, testkafka.CreateProducerMessage(groupTopic1, val)) + messages = append(messages, &sarama.ProducerMessage{ + Topic: groupTopic1, + Value: sarama.StringEncoder(val), + }) } err := testkafka.SendMessages(broker, messages...) @@ -118,7 +121,10 @@ func TestGroupConsume_ClaimMessageError(t *testing.T) { time.Sleep(5 * time.Second) - err = testkafka.SendMessages(broker, testkafka.CreateProducerMessage(groupTopic2, "321")) + err = testkafka.SendMessages(broker, &sarama.ProducerMessage{ + Topic: groupTopic2, + Value: sarama.StringEncoder("321"), + }) require.NoError(t, err) select { diff --git a/component/async/kafka/simple/integration_test.go b/component/async/kafka/simple/integration_test.go index 3fb5507e69..d54b0f70ed 100644 --- a/component/async/kafka/simple/integration_test.go +++ b/component/async/kafka/simple/integration_test.go @@ -65,7 +65,10 @@ func TestSimpleConsume(t *testing.T) { messages := make([]*sarama.ProducerMessage, 0, len(sent)) for _, val := range sent { - messages = append(messages, testkafka.CreateProducerMessage(simpleTopic1, val)) + messages = append(messages, &sarama.ProducerMessage{ + Topic: simpleTopic1, + Value: sarama.StringEncoder(val), + }) } err := testkafka.SendMessages(broker, messages...) @@ -117,7 +120,10 @@ func TestSimpleConsume_ClaimMessageError(t *testing.T) { time.Sleep(5 * time.Second) - err := testkafka.SendMessages(broker, testkafka.CreateProducerMessage(simpleTopic2, "123")) + err := testkafka.SendMessages(broker, &sarama.ProducerMessage{ + Topic: simpleTopic2, + Value: sarama.StringEncoder("123"), + }) require.NoError(t, err) select { @@ -141,7 +147,10 @@ func TestSimpleConsume_WithDurationOffset(t *testing.T) { messages := make([]*sarama.ProducerMessage, 0) for _, val := range sent { - messages = append(messages, testkafka.CreateProducerMessage(simpleTopic3, val)) + messages = append(messages, &sarama.ProducerMessage{ + Topic: simpleTopic3, + Value: sarama.StringEncoder(val), + }) } err := testkafka.SendMessages(broker, messages...) @@ -206,7 +215,10 @@ func TestSimpleConsume_WithTimestampOffset(t *testing.T) { messages := make([]*sarama.ProducerMessage, 0) for i, tm := range times { val := sent[i] - msg := testkafka.CreateProducerMessage(simpleTopic6, val) + msg := &sarama.ProducerMessage{ + Topic: simpleTopic6, + Value: sarama.StringEncoder(val), + } msg.Timestamp = tm messages = append(messages, msg) } @@ -263,7 +275,10 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset(t *testing.T) { messages := make([]*sarama.ProducerMessage, 0) numberOfMessages := 10 for i := 0; i < numberOfMessages; i++ { - messages = append(messages, testkafka.CreateProducerMessage(simpleTopic4, "foo")) + messages = append(messages, &sarama.ProducerMessage{ + Topic: simpleTopic4, + Value: sarama.StringEncoder("foo"), + }) } err := testkafka.SendMessages(broker, messages...) @@ -379,7 +394,10 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset_WithTimestampOff messages := make([]*sarama.ProducerMessage, 0) for i, tm := range times { val := sent[i] - msg := testkafka.CreateProducerMessage(simpleTopic7, val) + msg := &sarama.ProducerMessage{ + Topic: simpleTopic7, + Value: sarama.StringEncoder(val), + } msg.Timestamp = tm messages = append(messages, msg) } diff --git a/test/amqp/amqp.go b/test/amqp/amqp.go deleted file mode 100644 index 3c5587a16a..0000000000 --- a/test/amqp/amqp.go +++ /dev/null @@ -1,28 +0,0 @@ -package amqp - -import "github.com/streadway/amqp" - -// CreateQueue helper function. -func CreateQueue(endpoint, queue string) error { - conn, err := amqp.Dial(endpoint) - if err != nil { - return err - } - - channel, err := conn.Channel() - if err != nil { - return err - } - - _, err = channel.QueueDelete(queue, false, false, false) - if err != nil { - return err - } - - _, err = channel.QueueDeclare(queue, true, false, false, false, nil) - if err != nil { - return err - } - - return nil -} diff --git a/test/kafka/kafka.go b/test/kafka/kafka.go index 2169c836a8..b05112b508 100644 --- a/test/kafka/kafka.go +++ b/test/kafka/kafka.go @@ -128,11 +128,3 @@ func AsyncConsumeMessages(consumer async.Consumer, expectedMessageCount int) ([] } } } - -// CreateProducerMessage for a topic. -func CreateProducerMessage(topic, message string) *sarama.ProducerMessage { - return &sarama.ProducerMessage{ - Topic: topic, - Value: sarama.StringEncoder(message), - } -}