From 970c1e44ee6083c654e50dda1122d6466ccaa589 Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Thu, 4 Jan 2024 09:50:00 +0100 Subject: [PATCH] fix(kafka_topic): configure insufficient brokers error retries timeout --- CHANGELOG.md | 2 + .../kafkatopicrepository/create.go | 33 ++++++++---- .../kafkatopicrepository/create_test.go | 51 +++++++++++++++++-- .../sdkprovider/kafkatopicrepository/read.go | 2 +- .../kafkatopicrepository/repository.go | 1 + .../kafkatopicrepository/repository_test.go | 9 ++-- 6 files changed, 80 insertions(+), 18 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f75cf974d..4cc568eea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ nav_order: 1 ## [MAJOR.MINOR.PATCH] - YYYY-MM-DD +- Configure "insufficient broker" error retries timeout + ## [4.12.0] - 2024-01-03 - Fix insufficient brokers error when create kafka topic diff --git a/internal/sdkprovider/kafkatopicrepository/create.go b/internal/sdkprovider/kafkatopicrepository/create.go index 812273b0e..0b9210f28 100644 --- a/internal/sdkprovider/kafkatopicrepository/create.go +++ b/internal/sdkprovider/kafkatopicrepository/create.go @@ -2,15 +2,16 @@ package kafkatopicrepository import ( "context" - "regexp" - "time" + "fmt" + "strings" "github.com/aiven/aiven-go-client/v2" - "github.com/hashicorp/terraform-plugin-sdk/v2/helper/retry" + "github.com/avast/retry-go" ) -// reInsufficientBrokers the error message received when kafka is not ready yet -var reInsufficientBrokers = regexp.MustCompile(`Cluster only has [0-9]+ broker`) +// insufficientBrokersErr the error message received when kafka is not ready yet, like +// Cluster only has 2 broker(s), cannot set replication factor to 3 +var insufficientBrokersErr = "cannot set replication factor to" // Create creates a topic. // First checks if the topic does not exist for the safety @@ -31,7 +32,7 @@ func (rep *repository) Create(ctx context.Context, project, service string, req // When kafka is not ready, it throws reInsufficientBrokers. // Unfortunately, the error might be valid, so it will take a minute to fail. - err = retry.RetryContext(ctx, time.Minute, func() *retry.RetryError { + err = retry.Do(func() error { err = rep.client.Create(ctx, project, service, req) if err == nil { return nil @@ -42,13 +43,23 @@ func (rep *repository) Create(ctx context.Context, project, service string, req return nil } - // We must retry this one - if reInsufficientBrokers.MatchString(err.Error()) { - return retry.RetryableError(err) + // We must retry this one. + // Unfortunately, there is no way to tune retries depending on the error. + // So this error might be valid (insufficient brokers), then it will retry until context is expired. + // This timeout can be adjusted: + // https://registry.terraform.io/providers/aiven/aiven/latest/docs/resources/kafka_topic#create + if strings.Contains(err.Error(), insufficientBrokersErr) { + return err } // Other errors are non-retryable - return retry.NonRetryableError(err) - }) + return retry.Unrecoverable(err) + }, retry.Context(ctx)) + + // Retry lib returns a custom error object + // we can't compare in tests with + if err != nil { + return fmt.Errorf("topic create error: %s", err) + } return err } diff --git a/internal/sdkprovider/kafkatopicrepository/create_test.go b/internal/sdkprovider/kafkatopicrepository/create_test.go index ede5e0ae1..5d3e6cd3c 100644 --- a/internal/sdkprovider/kafkatopicrepository/create_test.go +++ b/internal/sdkprovider/kafkatopicrepository/create_test.go @@ -2,9 +2,11 @@ package kafkatopicrepository import ( "context" + "fmt" "sync" "sync/atomic" "testing" + "time" "github.com/aiven/aiven-go-client/v2" "github.com/stretchr/testify/assert" @@ -69,7 +71,50 @@ func TestCreateRecreateMissing(t *testing.T) { assert.True(t, rep.seenTopics["a/b/c"]) // cached again } -func TestReInsufficientBrokers(t *testing.T) { - assert.True(t, reInsufficientBrokers.MatchString(`{"errors":[{"message":"Cluster only has 2 broker(s), cannot set replication factor to 3","status":409}`)) - assert.False(t, reInsufficientBrokers.MatchString(`Cluster only has 2 ice creams`)) +func TestCreateRetries(t *testing.T) { + errInsufficientBrokers := fmt.Errorf(`{"errors":[{"message":"Cluster only has 2 broker(s), cannot set replication factor to 3","status":409}],"message":"Cluster only has 2 broker(s), cannot set replication factor to 3"}`) + cases := []struct { + name string + createErr []error + expectErr error + expectCalled int32 + }{ + { + name: "bad request error", + createErr: []error{fmt.Errorf("invalid value")}, + expectErr: fmt.Errorf("topic create error: All attempts fail:\n#1: invalid value"), + expectCalled: 1, // exits on the first unknown error + }, + { + name: "emulates insufficient broker error when create topic", + createErr: []error{errInsufficientBrokers, errInsufficientBrokers}, + expectCalled: 3, // two errors, three calls, the last one successful + }, + { + name: "emulates case when 501 retried in client and then 409 received (ignores 409)", + createErr: []error{ + aiven.Error{Status: 409, Message: "already exists"}, + }, + expectCalled: 1, // exists on the first call as it means the topic is created + }, + } + + for _, opt := range cases { + t.Run(opt.name, func(t *testing.T) { + client := &fakeTopicClient{ + createErr: opt.createErr, + } + + ctx := context.Background() + rep := newRepository(client) + rep.workerCallInterval = time.Millisecond + + req := aiven.CreateKafkaTopicRequest{ + TopicName: "my-topic", + } + err := rep.Create(ctx, "foo", "bar", req) + assert.Equal(t, opt.expectErr, err) + assert.Equal(t, opt.expectCalled, client.createCalled) + }) + } } diff --git a/internal/sdkprovider/kafkatopicrepository/read.go b/internal/sdkprovider/kafkatopicrepository/read.go index e3f5234e6..ecce54b63 100644 --- a/internal/sdkprovider/kafkatopicrepository/read.go +++ b/internal/sdkprovider/kafkatopicrepository/read.go @@ -142,7 +142,7 @@ func (rep *repository) fetch(ctx context.Context, queue map[string]*request) { list = rspList return nil - }, retry.Delay(rep.v2ListRetryDelay)) + }, retry.Context(ctx), retry.Delay(rep.v2ListRetryDelay)) if err != nil { // Send errors diff --git a/internal/sdkprovider/kafkatopicrepository/repository.go b/internal/sdkprovider/kafkatopicrepository/repository.go index 005017ca0..42b7cc0f8 100644 --- a/internal/sdkprovider/kafkatopicrepository/repository.go +++ b/internal/sdkprovider/kafkatopicrepository/repository.go @@ -31,6 +31,7 @@ const ( defaultWorkerCallInterval = time.Second defaultSeenTopicsSize = 1000 defaultSeenServicesSize = 10 + defaultCreateAttempts = 5 ) // New returns process singleton Repository diff --git a/internal/sdkprovider/kafkatopicrepository/repository_test.go b/internal/sdkprovider/kafkatopicrepository/repository_test.go index bb52fe0f4..bbf409396 100644 --- a/internal/sdkprovider/kafkatopicrepository/repository_test.go +++ b/internal/sdkprovider/kafkatopicrepository/repository_test.go @@ -256,7 +256,7 @@ type fakeTopicClient struct { // key format: project/service/topic storage map[string]*aiven.KafkaListTopic // errors to return - createErr error + createErr []error deleteErr error v1ListErr error v2ListErr error @@ -269,8 +269,11 @@ type fakeTopicClient struct { func (f *fakeTopicClient) Create(context.Context, string, string, aiven.CreateKafkaTopicRequest) error { time.Sleep(time.Millisecond * 100) // we need some lag to simulate races - atomic.AddInt32(&f.createCalled, 1) - return f.createErr + attempt := atomic.AddInt32(&f.createCalled, 1) - 1 + if int(attempt) >= len(f.createErr) { + return nil + } + return f.createErr[attempt] } func (f *fakeTopicClient) Update(context.Context, string, string, string, aiven.UpdateKafkaTopicRequest) error {