Skip to content

Commit

Permalink
fix(kafka_topic): configure insufficient brokers error retries timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
byashimov committed Jan 8, 2024
1 parent 07d6ed7 commit f18c3b8
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 18 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ nav_order: 1
<!-- [MAJOR.MINOR.PATCH] - YYYY-MM-DD -->

## [MAJOR.MINOR.PATCH] - YYYY-MM-DD

- Add organization application users support
- Configure "insufficient broker" error retries timeout

## [4.12.1] - 2024-01-05

Expand Down
33 changes: 22 additions & 11 deletions internal/sdkprovider/kafkatopicrepository/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package kafkatopicrepository

import (
"context"
"regexp"
"time"
"fmt"
"strings"

"github.com/aiven/aiven-go-client/v2"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry"
"github.com/avast/retry-go"
)

// reInsufficientBrokers the error message received when kafka is not ready yet
var reInsufficientBrokers = regexp.MustCompile(`Cluster only has [0-9]+ broker`)
// insufficientBrokersErr the error message received when kafka is not ready yet, like
// Cluster only has 2 broker(s), cannot set replication factor to 3
var insufficientBrokersErr = "cannot set replication factor to"

// Create creates a topic.
// First checks if the topic does not exist for the safety
Expand All @@ -31,7 +32,7 @@ func (rep *repository) Create(ctx context.Context, project, service string, req

// When kafka is not ready, it throws reInsufficientBrokers.
// Unfortunately, the error might be valid, so it will take a minute to fail.
err = retry.RetryContext(ctx, time.Minute, func() *retry.RetryError {
err = retry.Do(func() error {
err = rep.client.Create(ctx, project, service, req)
if err == nil {
return nil
Expand All @@ -42,13 +43,23 @@ func (rep *repository) Create(ctx context.Context, project, service string, req
return nil
}

// We must retry this one
if reInsufficientBrokers.MatchString(err.Error()) {
return retry.RetryableError(err)
// We must retry this one.
// Unfortunately, there is no way to tune retries depending on the error.
// So this error might be valid (insufficient brokers), then it will retry until context is expired.
// This timeout can be adjusted:
// https://registry.terraform.io/providers/aiven/aiven/latest/docs/resources/kafka_topic#create
if strings.Contains(err.Error(), insufficientBrokersErr) {
return err
}

// Other errors are non-retryable
return retry.NonRetryableError(err)
})
return retry.Unrecoverable(err)
}, retry.Context(ctx))

// Retry lib returns a custom error object
// we can't compare in tests with
if err != nil {
return fmt.Errorf("topic create error: %s", err)
}
return err
}
51 changes: 48 additions & 3 deletions internal/sdkprovider/kafkatopicrepository/create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package kafkatopicrepository

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/aiven/aiven-go-client/v2"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -69,7 +71,50 @@ func TestCreateRecreateMissing(t *testing.T) {
assert.True(t, rep.seenTopics["a/b/c"]) // cached again
}

func TestReInsufficientBrokers(t *testing.T) {
assert.True(t, reInsufficientBrokers.MatchString(`{"errors":[{"message":"Cluster only has 2 broker(s), cannot set replication factor to 3","status":409}`))
assert.False(t, reInsufficientBrokers.MatchString(`Cluster only has 2 ice creams`))
func TestCreateRetries(t *testing.T) {
errInsufficientBrokers := fmt.Errorf(`{"errors":[{"message":"Cluster only has 2 broker(s), cannot set replication factor to 3","status":409}],"message":"Cluster only has 2 broker(s), cannot set replication factor to 3"}`)
cases := []struct {
name string
createErr []error
expectErr error
expectCalled int32
}{
{
name: "bad request error",
createErr: []error{fmt.Errorf("invalid value")},
expectErr: fmt.Errorf("topic create error: All attempts fail:\n#1: invalid value"),
expectCalled: 1, // exits on the first unknown error
},
{
name: "emulates insufficient broker error when create topic",
createErr: []error{errInsufficientBrokers, errInsufficientBrokers},
expectCalled: 3, // two errors, three calls, the last one successful
},
{
name: "emulates case when 501 retried in client and then 409 received (ignores 409)",
createErr: []error{
aiven.Error{Status: 409, Message: "already exists"},
},
expectCalled: 1, // exists on the first call as it means the topic is created
},
}

for _, opt := range cases {
t.Run(opt.name, func(t *testing.T) {
client := &fakeTopicClient{
createErr: opt.createErr,
}

ctx := context.Background()
rep := newRepository(client)
rep.workerCallInterval = time.Millisecond

req := aiven.CreateKafkaTopicRequest{
TopicName: "my-topic",
}
err := rep.Create(ctx, "foo", "bar", req)
assert.Equal(t, opt.expectErr, err)
assert.Equal(t, opt.expectCalled, client.createCalled)
})
}
}
2 changes: 1 addition & 1 deletion internal/sdkprovider/kafkatopicrepository/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func (rep *repository) fetch(ctx context.Context, queue map[string]*request) {

list = rspList
return nil
}, retry.Delay(rep.v2ListRetryDelay))
}, retry.Context(ctx), retry.Delay(rep.v2ListRetryDelay))

if err != nil {
// Send errors
Expand Down
9 changes: 6 additions & 3 deletions internal/sdkprovider/kafkatopicrepository/repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ type fakeTopicClient struct {
// key format: project/service/topic
storage map[string]*aiven.KafkaListTopic
// errors to return
createErr error
createErr []error
deleteErr error
v1ListErr error
v2ListErr error
Expand All @@ -269,8 +269,11 @@ type fakeTopicClient struct {

func (f *fakeTopicClient) Create(context.Context, string, string, aiven.CreateKafkaTopicRequest) error {
time.Sleep(time.Millisecond * 100) // we need some lag to simulate races
atomic.AddInt32(&f.createCalled, 1)
return f.createErr
attempt := atomic.AddInt32(&f.createCalled, 1) - 1
if int(attempt) >= len(f.createErr) {
return nil
}
return f.createErr[attempt]
}

func (f *fakeTopicClient) Update(context.Context, string, string, string, aiven.UpdateKafkaTopicRequest) error {
Expand Down

0 comments on commit f18c3b8

Please sign in to comment.