Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mantzas committed Mar 16, 2024
1 parent a23d6cf commit 1023bea
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 46 deletions.
27 changes: 25 additions & 2 deletions client/amqp/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"context"
"testing"

testamqp "github.com/beatlabs/patron/test/amqp"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/opentracing/opentracing-go/mocktracer"
Expand All @@ -27,7 +26,7 @@ func TestRun(t *testing.T) {
opentracing.SetGlobalTracer(mtr)
t.Cleanup(func() { mtr.Reset() })

require.NoError(t, testamqp.CreateQueue(endpoint, queue))
require.NoError(t, createQueue(endpoint, queue))

pub, err := New(endpoint)
require.NoError(t, err)
Expand Down Expand Up @@ -72,3 +71,27 @@ func TestRun(t *testing.T) {
assert.NoError(t, channel.Close())
assert.NoError(t, conn.Close())
}

func createQueue(endpoint, queue string) error {
conn, err := amqp.Dial(endpoint)
if err != nil {
return err
}

channel, err := conn.Channel()
if err != nil {
return err
}

_, err = channel.QueueDelete(queue, false, false, false)
if err != nil {
return err
}

_, err = channel.QueueDeclare(queue, true, false, false, false, nil)
if err != nil {
return err
}

return nil
}
10 changes: 8 additions & 2 deletions component/async/kafka/group/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,10 @@ func TestGroupConsume(t *testing.T) {

messages := make([]*sarama.ProducerMessage, 0, len(sent))
for _, val := range sent {
messages = append(messages, testkafka.CreateProducerMessage(groupTopic1, val))
messages = append(messages, &sarama.ProducerMessage{
Topic: groupTopic1,
Value: sarama.StringEncoder(val),
})
}

err := testkafka.SendMessages(broker, messages...)
Expand Down Expand Up @@ -118,7 +121,10 @@ func TestGroupConsume_ClaimMessageError(t *testing.T) {

time.Sleep(5 * time.Second)

err = testkafka.SendMessages(broker, testkafka.CreateProducerMessage(groupTopic2, "321"))
err = testkafka.SendMessages(broker, &sarama.ProducerMessage{
Topic: groupTopic2,
Value: sarama.StringEncoder("321"),
})
require.NoError(t, err)

select {
Expand Down
30 changes: 24 additions & 6 deletions component/async/kafka/simple/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ func TestSimpleConsume(t *testing.T) {

messages := make([]*sarama.ProducerMessage, 0, len(sent))
for _, val := range sent {
messages = append(messages, testkafka.CreateProducerMessage(simpleTopic1, val))
messages = append(messages, &sarama.ProducerMessage{
Topic: simpleTopic1,
Value: sarama.StringEncoder(val),
})
}

err := testkafka.SendMessages(broker, messages...)
Expand Down Expand Up @@ -117,7 +120,10 @@ func TestSimpleConsume_ClaimMessageError(t *testing.T) {

time.Sleep(5 * time.Second)

err := testkafka.SendMessages(broker, testkafka.CreateProducerMessage(simpleTopic2, "123"))
err := testkafka.SendMessages(broker, &sarama.ProducerMessage{
Topic: simpleTopic2,
Value: sarama.StringEncoder("123"),
})
require.NoError(t, err)

select {
Expand All @@ -141,7 +147,10 @@ func TestSimpleConsume_WithDurationOffset(t *testing.T) {

messages := make([]*sarama.ProducerMessage, 0)
for _, val := range sent {
messages = append(messages, testkafka.CreateProducerMessage(simpleTopic3, val))
messages = append(messages, &sarama.ProducerMessage{
Topic: simpleTopic3,
Value: sarama.StringEncoder(val),
})
}

err := testkafka.SendMessages(broker, messages...)
Expand Down Expand Up @@ -206,7 +215,10 @@ func TestSimpleConsume_WithTimestampOffset(t *testing.T) {
messages := make([]*sarama.ProducerMessage, 0)
for i, tm := range times {
val := sent[i]
msg := testkafka.CreateProducerMessage(simpleTopic6, val)
msg := &sarama.ProducerMessage{
Topic: simpleTopic6,
Value: sarama.StringEncoder(val),
}
msg.Timestamp = tm
messages = append(messages, msg)
}
Expand Down Expand Up @@ -263,7 +275,10 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset(t *testing.T) {
messages := make([]*sarama.ProducerMessage, 0)
numberOfMessages := 10
for i := 0; i < numberOfMessages; i++ {
messages = append(messages, testkafka.CreateProducerMessage(simpleTopic4, "foo"))
messages = append(messages, &sarama.ProducerMessage{
Topic: simpleTopic4,
Value: sarama.StringEncoder("foo"),
})
}

err := testkafka.SendMessages(broker, messages...)
Expand Down Expand Up @@ -379,7 +394,10 @@ func TestSimpleConsume_WithNotificationOnceReachingLatestOffset_WithTimestampOff
messages := make([]*sarama.ProducerMessage, 0)
for i, tm := range times {
val := sent[i]
msg := testkafka.CreateProducerMessage(simpleTopic7, val)
msg := &sarama.ProducerMessage{
Topic: simpleTopic7,
Value: sarama.StringEncoder(val),
}
msg.Timestamp = tm
messages = append(messages, msg)
}
Expand Down
28 changes: 0 additions & 28 deletions test/amqp/amqp.go

This file was deleted.

8 changes: 0 additions & 8 deletions test/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,3 @@ func AsyncConsumeMessages(consumer async.Consumer, expectedMessageCount int) ([]
}
}
}

// CreateProducerMessage for a topic.
func CreateProducerMessage(topic, message string) *sarama.ProducerMessage {
return &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
}
}

0 comments on commit 1023bea

Please sign in to comment.