Skip to content

Commit

Permalink
Cleanup tests
Browse files Browse the repository at this point in the history
  • Loading branch information
anodar committed Apr 5, 2024
1 parent 9d450af commit 8da1e86
Showing 1 changed file with 19 additions and 1 deletion.
20 changes: 19 additions & 1 deletion pubsub/pubsub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,14 @@ func createGroup(ctx context.Context, t *testing.T, streamName, groupName string
}
}

func destroyGroup(ctx context.Context, t *testing.T, streamName, groupName string, client redis.UniversalClient) {
t.Helper()
_, err := client.XGroupDestroy(ctx, streamName, groupName).Result()
if err != nil {
t.Fatalf("Error creating stream group: %v", err)
}
}

type configOpt interface {
apply(consCfg *ConsumerConfig, prodCfg *ProducerConfig)
}
Expand Down Expand Up @@ -99,6 +107,16 @@ func newProducerConsumers(ctx context.Context, t *testing.T, opts ...configOpt)
consumers = append(consumers, c)
}
createGroup(ctx, t, streamName, groupName, producer.client)
t.Cleanup(func() {
destroyGroup(ctx, t, streamName, groupName, producer.client)
var keys []string
for _, c := range consumers {
keys = append(keys, c.heartBeatKey())
}
if _, err := producer.client.Del(ctx, keys...).Result(); err != nil {
t.Fatalf("Error deleting heartbeat keys: %v\n", err)
}
})
return producer, consumers
}

Expand Down Expand Up @@ -355,7 +373,7 @@ func TestRedisClaimingOwnershipReproduceDisabled(t *testing.T) {
if len(gotResponses) != wantMsgCnt {
t.Errorf("Got %d responses want: %d\n", len(gotResponses), wantMsgCnt)
}
if cnt := len(producer.promises); cnt != 0 {
if cnt := producer.promisesLen(); cnt != 0 {
t.Errorf("Producer still has %d unfullfilled promises", cnt)
}
}
Expand Down

0 comments on commit 8da1e86

Please sign in to comment.