From 5b8df24876938d32f1481216f7bb515d1b3420ef Mon Sep 17 00:00:00 2001 From: Murad Biashimov Date: Fri, 8 Sep 2023 13:05:08 +0200 Subject: [PATCH] refactor(kafkatopic): add KafkaTopic repository [skip ci] --- go.mod | 4 +- go.sum | 7 +- .../kafkatopicrepository/create.go | 32 ++ .../kafkatopicrepository/create_test.go | 70 +++++ .../kafkatopicrepository/delete.go | 22 ++ .../kafkatopicrepository/delete_test.go | 40 +++ .../sdkprovider/kafkatopicrepository/read.go | 163 ++++++++++ .../kafkatopicrepository/repository.go | 163 ++++++++++ .../kafkatopicrepository/repository_test.go | 285 ++++++++++++++++++ .../kafkatopicrepository/update.go | 11 + .../service/kafkatopic/kafka_topic.go | 118 +------- .../service/kafkatopic/kafka_topic_cache.go | 205 ------------- .../kafkatopic/kafka_topic_cache_test.go | 192 ------------ .../service/kafkatopic/kafka_topic_test.go | 8 +- .../service/kafkatopic/kafka_topic_wait.go | 141 --------- 15 files changed, 807 insertions(+), 654 deletions(-) create mode 100644 internal/sdkprovider/kafkatopicrepository/create.go create mode 100644 internal/sdkprovider/kafkatopicrepository/create_test.go create mode 100644 internal/sdkprovider/kafkatopicrepository/delete.go create mode 100644 internal/sdkprovider/kafkatopicrepository/delete_test.go create mode 100644 internal/sdkprovider/kafkatopicrepository/read.go create mode 100644 internal/sdkprovider/kafkatopicrepository/repository.go create mode 100644 internal/sdkprovider/kafkatopicrepository/repository_test.go create mode 100644 internal/sdkprovider/kafkatopicrepository/update.go delete mode 100644 internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go delete mode 100644 internal/sdkprovider/service/kafkatopic/kafka_topic_cache_test.go delete mode 100644 internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go diff --git a/go.mod b/go.mod index 9b9c0bf59..20393e0ba 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.21.1 require ( github.com/aiven/aiven-go-client/v2 v2.0.0 + github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.0 github.com/docker/go-units v0.5.0 github.com/ettle/strcase v0.1.1 @@ -15,9 +16,10 @@ require ( github.com/hashicorp/terraform-plugin-mux v0.12.0 github.com/hashicorp/terraform-plugin-sdk/v2 v2.29.0 github.com/kelseyhightower/envconfig v1.4.0 + github.com/samber/lo v1.38.1 github.com/stretchr/testify v1.8.4 golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819 - golang.org/x/sync v0.3.0 + golang.org/x/sync v0.1.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 499c5067c..0073ea43a 100644 --- a/go.sum +++ b/go.sum @@ -212,6 +212,8 @@ github.com/apparentlymart/go-textseg/v12 v12.0.0/go.mod h1:S/4uRK2UtaQttw1GenVJE github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= github.com/apparentlymart/go-textseg/v15 v15.0.0 h1:uYvfpb3DyLSCGWnctWKGj857c6ew1u1fNQOlOtuGxQY= github.com/apparentlymart/go-textseg/v15 v15.0.0/go.mod h1:K8XmNZdhEBkdlyDdvbmmsvpAG721bKi0joRfFdHIWJ4= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/aws/aws-sdk-go v1.44.122 h1:p6mw01WBaNpbdP2xrisz5tIkcNwzj/HysobNoaAHjgo= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas= @@ -515,6 +517,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= @@ -719,9 +723,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/sdkprovider/kafkatopicrepository/create.go b/internal/sdkprovider/kafkatopicrepository/create.go new file mode 100644 index 000000000..944791696 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/create.go @@ -0,0 +1,32 @@ +package kafkatopicrepository + +import ( + "context" + + "github.com/aiven/aiven-go-client/v2" +) + +// Create creates topic. +// First checks if topic does not exist for the safety +// Then calls creates topic. +func (rep *repository) Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error { + // aiven.KafkaTopics.Create() function may return 501 on create + // Second call might say that topic already exists, and we have retries in aiven client + // So to be sure, better check it before create + err := rep.exists(ctx, project, service, req.TopicName, true) + if err == nil { + return errAlreadyExists + } + + // If this is not errNotFound, then something happened + if err != errNotFound { + return err + } + + // 501 is retried in the client, so it can return 429 + err = rep.client.Create(ctx, project, service, req) + if aiven.IsAlreadyExists(err) { + return nil + } + return err +} diff --git a/internal/sdkprovider/kafkatopicrepository/create_test.go b/internal/sdkprovider/kafkatopicrepository/create_test.go new file mode 100644 index 000000000..39463e69f --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/create_test.go @@ -0,0 +1,70 @@ +package kafkatopicrepository + +import ( + "context" + "sync" + "sync/atomic" + "testing" + + "github.com/aiven/aiven-go-client/v2" + "github.com/stretchr/testify/assert" +) + +// TestCreateConflict tests that one goroutine out of 100 creates topic, while others get errAlreadyExists +func TestCreateConflict(t *testing.T) { + client := &fakeTopicClient{} + rep := newRepository(client) + ctx := context.Background() + + var conflictErr int32 + var wg sync.WaitGroup + for i := 0; i < 100; i++ { + wg.Add(1) + go func() { + err := rep.Create(ctx, "a", "b", aiven.CreateKafkaTopicRequest{TopicName: "c"}) + if err == errAlreadyExists { + atomic.AddInt32(&conflictErr, 1) + } + wg.Done() + }() + } + wg.Wait() + assert.EqualValues(t, 99, conflictErr) + assert.EqualValues(t, 1, client.createCalled) + assert.EqualValues(t, 1, client.v1ListCalled) + assert.EqualValues(t, 0, client.v2ListCalled) + assert.True(t, rep.seenServices["a/b"]) + assert.True(t, rep.seenTopics["a/b/c"]) +} + +// TestCreateRecreateMissing must recreate missing topic +// When Kafka is off, it looses all topics. We recreate them instead of making user clear the state +func TestCreateRecreateMissing(t *testing.T) { + client := &fakeTopicClient{} + rep := newRepository(client) + ctx := context.Background() + + // Creates topic + err := rep.Create(ctx, "a", "b", aiven.CreateKafkaTopicRequest{TopicName: "c"}) + assert.NoError(t, err) + assert.EqualValues(t, 1, client.createCalled) + assert.EqualValues(t, 1, client.v1ListCalled) + assert.EqualValues(t, 0, client.v2ListCalled) + assert.True(t, rep.seenServices["a/b"]) + assert.True(t, rep.seenTopics["a/b/c"]) + + // Forgets the topic, like if it's missing + err = rep.forgetTopic("a", "b", "c") + assert.NoError(t, err) + assert.True(t, rep.seenServices["a/b"]) + assert.False(t, rep.seenTopics["a/b/c"]) // not cached, missing + + // Recreates topic + err = rep.Create(ctx, "a", "b", aiven.CreateKafkaTopicRequest{TopicName: "c"}) + assert.NoError(t, err) + assert.EqualValues(t, 2, client.createCalled) // Updated + assert.EqualValues(t, 1, client.v1ListCalled) + assert.EqualValues(t, 0, client.v2ListCalled) + assert.True(t, rep.seenServices["a/b"]) + assert.True(t, rep.seenTopics["a/b/c"]) // cached again +} diff --git a/internal/sdkprovider/kafkatopicrepository/delete.go b/internal/sdkprovider/kafkatopicrepository/delete.go new file mode 100644 index 000000000..e38c9c381 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/delete.go @@ -0,0 +1,22 @@ +package kafkatopicrepository + +import ( + "context" + + "github.com/aiven/aiven-go-client/v2" +) + +func (rep *repository) Delete(ctx context.Context, project, service, topic string) error { + // This might give us false positive + // But it speedups things a lot, and if kafka has been off, + // then it will make easier to remove topics from state + err := rep.client.Delete(ctx, project, service, topic) + if !(err == nil || aiven.IsNotFound(err)) { + return err + } + + rep.Lock() + rep.seenTopics[newKey(project, service, topic)] = false + rep.Unlock() + return nil +} diff --git a/internal/sdkprovider/kafkatopicrepository/delete_test.go b/internal/sdkprovider/kafkatopicrepository/delete_test.go new file mode 100644 index 000000000..10b311019 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/delete_test.go @@ -0,0 +1,40 @@ +package kafkatopicrepository + +import ( + "context" + "testing" + + "github.com/aiven/aiven-go-client/v2" + "github.com/stretchr/testify/assert" +) + +// TestDeleteDoesNotExist shouldn't rise that topic does not exist on delete, +// if it doesn't exist for real +func TestDeleteDoesNotExist(t *testing.T) { + client := &fakeTopicClient{} + rep := newRepository(client) + ctx := context.Background() + err := rep.Delete(ctx, "a", "b", "c") + assert.NoError(t, err) + assert.EqualValues(t, 0, client.v1ListCalled) + assert.EqualValues(t, 0, client.v2ListCalled) + assert.EqualValues(t, 1, client.deleteCalled) +} + +// TestDeletesAfterRetry proves that it deletes topic +// when client has made retries under the hood and got 404 on some call +func TestDeletesAfterRetry(t *testing.T) { + client := &fakeTopicClient{ + deleteErr: errNotFound, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + } + rep := newRepository(client) + ctx := context.Background() + err := rep.Delete(ctx, "a", "b", "c") + assert.NoError(t, err) + assert.EqualValues(t, 0, client.v1ListCalled) + assert.EqualValues(t, 0, client.v2ListCalled) + assert.EqualValues(t, 1, client.deleteCalled) +} diff --git a/internal/sdkprovider/kafkatopicrepository/read.go b/internal/sdkprovider/kafkatopicrepository/read.go new file mode 100644 index 000000000..e3f5234e6 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/read.go @@ -0,0 +1,163 @@ +package kafkatopicrepository + +import ( + "context" + "fmt" + + "github.com/aiven/aiven-go-client/v2" + "github.com/avast/retry-go" + "github.com/samber/lo" +) + +func (rep *repository) Read(ctx context.Context, project, service, topic string) (*aiven.KafkaTopic, error) { + // We have quick methods to determine that topic does not exist + err := rep.exists(ctx, project, service, topic, false) + if err != nil { + return nil, err + } + + // Adds request to the queue + c := make(chan *response, 1) + r := &request{ + project: project, + service: service, + topic: topic, + rsp: c, + } + rep.Lock() + rep.queue = append(rep.queue, r) + rep.Unlock() + + // Waits response from the channel + // Or exits on context done + select { + case <-ctx.Done(): + return nil, ctx.Err() + case rsp := <-c: + close(c) + return rsp.topic, rsp.err + } +} + +// exists returns nil if topic exists, or errNotFound if doesn't: +// 1. checks repository.seenTopics for known topics +// 2. calls v1List for the remote state for the given service and marks it in repository.seenServices +// 3. saves topic names to repository.seenTopics, so its result can be reused +// 4. when acquire true, then saves topic to repository.seenTopics (for creating) +// todo: use context with the new client +func (rep *repository) exists(ctx context.Context, project, service, topic string, acquire bool) error { + rep.Lock() + defer rep.Unlock() + // Checks repository.seenTopics. + // If it has been just created, it is not available in v1List. + // So calling it first doesn't make any sense + serviceKey := newKey(project, service) + topicKey := newKey(serviceKey, topic) + if rep.seenTopics[topicKey] { + return nil + } + + // Goes for v1List + if !rep.seenServices[serviceKey] { + list, err := rep.client.List(ctx, project, service) + if err != nil { + return err + } + + // Marks seen all the topics + for _, t := range list { + rep.seenTopics[newKey(serviceKey, t.TopicName)] = true + } + + // Service is seen too. It never goes here again + rep.seenServices[serviceKey] = true + } + + // Checks updated list + if rep.seenTopics[topicKey] { + return nil + } + + // Create functions run in parallel need to lock the name before create + // Otherwise they may run into conflict + if acquire { + rep.seenTopics[topicKey] = true + } + + // v1List doesn't contain the topic + return errNotFound +} + +// fetch fetches requested topics configuration +// 1. groups topics by service +// 2. requests topics (in chunks) +// Warning: if we call V2List with at least one "not found" topic, it will return 404 for all topics +// Should be certain that all topics in queue do exist. Call repository.exists first to do so +func (rep *repository) fetch(ctx context.Context, queue map[string]*request) { + // Groups topics by service + byService := make(map[string][]*request, 0) + for i := range queue { + r := queue[i] + key := newKey(r.project, r.service) + byService[key] = append(byService[key], r) + } + + // Fetches topics configuration + for _, reqs := range byService { + topicNames := make([]string, 0, len(reqs)) + for _, r := range reqs { + topicNames = append(topicNames, r.topic) + } + + // Topics are grouped by service + // We can share this values + project := reqs[0].project + service := reqs[0].service + + // Slices topic names by repository.v2ListBatchSize + // because V2List has a limit + for _, chunk := range lo.Chunk(topicNames, rep.v2ListBatchSize) { + // V2List() and Get() do not get info immediately + // Some retries should be applied if result is not equal to requested values + var list []*aiven.KafkaTopic + err := retry.Do(func() error { + rspList, err := rep.client.V2List(ctx, project, service, chunk) + + // 404 means that there is "not found" on the list + // But repository.exists should have checked these, so now this is a fail + if aiven.IsNotFound(err) { + return retry.Unrecoverable(fmt.Errorf("topic list has changed")) + } + + // Something else happened + // We have retries in the client, so this is bad + if err != nil { + return retry.Unrecoverable(err) + } + + // This is an old cache, we need to retry it until succeed + if len(rspList) != len(chunk) { + return fmt.Errorf("got %d topics, expected %d. Retrying", len(rspList), len(chunk)) + } + + list = rspList + return nil + }, retry.Delay(rep.v2ListRetryDelay)) + + if err != nil { + // Send errors + // Flattens error to a string, because it might go really completed for testing + err = fmt.Errorf("topic read error: %s", err) + for _, r := range reqs { + r.send(nil, err) + } + continue + } + + // Sends topics + for _, t := range list { + queue[newKey(project, service, t.TopicName)].send(t, nil) + } + } + } +} diff --git a/internal/sdkprovider/kafkatopicrepository/repository.go b/internal/sdkprovider/kafkatopicrepository/repository.go new file mode 100644 index 000000000..8e2295df0 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/repository.go @@ -0,0 +1,163 @@ +package kafkatopicrepository + +import ( + "context" + "net/http" + "strings" + "sync" + "time" + + "github.com/aiven/aiven-go-client/v2" +) + +var ( + initOnce sync.Once + // singleRep a singleton for repository + singleRep = &repository{} + // errNotFound mimics Aiven "not found" error. Never wrap it, so it can be determined by aiven.IsNotFound + errNotFound = aiven.Error{Status: http.StatusNotFound, Message: "Topic not found"} + // errAlreadyExists mimics Aiven "conflict" error. Never wrap it, so it can be determined by aiven.IsAlreadyExists + errAlreadyExists = aiven.Error{Status: http.StatusConflict, Message: "Topic conflict, already exists"} +) + +const ( + // defaultV2ListBatchSize the max size of batch to call V2List + defaultV2ListBatchSize = 100 + + // defaultV2ListRetryDelay V2List caches results, so we retry it by this delay + defaultV2ListRetryDelay = 5 * time.Second + + // defaultWorkerCallInterval how often worker should run + defaultWorkerCallInterval = time.Second + defaultSeenTopicsSize = 1000 + defaultSeenServicesSize = 10 +) + +// New returns process singleton Repository +func New(client topicsClient) Repository { + initOnce.Do(func() { + singleRep = newRepository(client) + go singleRep.worker() + }) + return singleRep +} + +// Repository CRUD interface for topics +type Repository interface { + Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error + Read(ctx context.Context, project, service, topic string) (*aiven.KafkaTopic, error) + Update(ctx context.Context, project, service, topic string, req aiven.UpdateKafkaTopicRequest) error + Delete(ctx context.Context, project, service, topic string) error +} + +// topicsClient interface for unit tests +type topicsClient interface { + List(ctx context.Context, project, service string) ([]*aiven.KafkaListTopic, error) + V2List(ctx context.Context, project, service string, topicNames []string) ([]*aiven.KafkaTopic, error) + Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error + Update(ctx context.Context, project, service, topic string, req aiven.UpdateKafkaTopicRequest) error + Delete(ctx context.Context, project, service, topic string) error +} + +func newRepository(client topicsClient) *repository { + r := &repository{ + client: client, + seenTopics: make(map[string]bool, defaultSeenTopicsSize), + seenServices: make(map[string]bool, defaultSeenServicesSize), + v2ListBatchSize: defaultV2ListBatchSize, + v2ListRetryDelay: defaultV2ListRetryDelay, + workerCallInterval: defaultWorkerCallInterval, + } + return r +} + +// repository implements Repository +// Handling thousands of topics might be challenging for the API +// This repository uses retries, rate-limiting, queueing, caching to provide with best speed/durability ratio +// Must be used as a singleton. See singleRep. +type repository struct { + sync.Mutex + client topicsClient + queue []*request + v2ListBatchSize int + v2ListRetryDelay time.Duration + workerCallInterval time.Duration + + // seenTopics stores topic names from v1List and Create() + // because v1List might not return fresh topics + seenTopics map[string]bool + + // seenServices stores true if v1List was called for the service + seenServices map[string]bool +} + +// worker processes the queue with fetch and ticker (rate-limit). Runs in the background. +func (rep *repository) worker() { + ticker := time.NewTicker(rep.workerCallInterval) + for { + <-ticker.C + b := rep.withdraw() + if b != nil { + rep.fetch(context.Background(), b) + } + } +} + +// withdraw returns the queue and cleans it +func (rep *repository) withdraw() map[string]*request { + rep.Lock() + defer rep.Unlock() + + if len(rep.queue) == 0 { + return nil + } + + q := make(map[string]*request, len(rep.queue)) + for _, r := range rep.queue { + q[r.key()] = r + } + + rep.queue = make([]*request, 0) + return q +} + +// forgetTopic removes topic from repository.seenTopics. For tests only! +func (rep *repository) forgetTopic(project, service, topic string) error { + rep.Lock() + key := newKey(project, service, topic) + if !rep.seenTopics[key] { + return errNotFound + } + rep.seenTopics[key] = false + rep.Unlock() + return nil +} + +type response struct { + topic *aiven.KafkaTopic + err error +} + +type request struct { + project string + service string + topic string + rsp chan *response +} + +func (r *request) key() string { + return newKey(r.project, r.service, r.topic) +} + +func (r *request) send(topic *aiven.KafkaTopic, err error) { + r.rsp <- &response{topic: topic, err: err} +} + +func newKey(parts ...string) string { + return strings.Join(parts, "/") +} + +// ForgetTopic see repository.forgetTopic +func ForgetTopic(project, service, topic string) error { + return singleRep.forgetTopic(project, service, topic) +} diff --git a/internal/sdkprovider/kafkatopicrepository/repository_test.go b/internal/sdkprovider/kafkatopicrepository/repository_test.go new file mode 100644 index 000000000..bd26ef79d --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/repository_test.go @@ -0,0 +1,285 @@ +package kafkatopicrepository + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/aiven/aiven-go-client/v2" + "github.com/stretchr/testify/assert" +) + +func TestRepository(t *testing.T) { + cases := []struct { + name string + requests []request + responses []response + storage map[string]*aiven.KafkaListTopic + v1ListErr error + v1ListCalled int32 + v2ListErr error + v2ListCalled int32 + v2ListBatchSize int + }{ + { + name: "unknown topic returns 404", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: errNotFound}, + }, + storage: make(map[string]*aiven.KafkaListTopic), + v1ListCalled: 1, + v2ListCalled: 0, // doesn't reach V2List, because "storage" doesn't return the topic + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "gets topic", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "one OK, one 404 same service", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + {project: "a", service: "b", topic: "d"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {err: errNotFound}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "one OK, one 404 different services", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + {project: "a", service: "d", topic: "e"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {err: errNotFound}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v1ListCalled: 2, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "two OK different services", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + {project: "a", service: "d", topic: "e"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {topic: &aiven.KafkaTopic{TopicName: "e"}}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + "a/d/e": {TopicName: "e"}, + }, + v1ListCalled: 2, + v2ListCalled: 2, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "different projects, different services, multiple batches", + requests: []request{ + // Service a/a + {project: "a", service: "a", topic: "a"}, + {project: "a", service: "a", topic: "b"}, + {project: "a", service: "a", topic: "c"}, + // Service a/b + {project: "a", service: "b", topic: "a"}, + {project: "a", service: "b", topic: "b"}, + {project: "a", service: "b", topic: "c"}, + // Service b/a + {project: "b", service: "a", topic: "a"}, + {project: "b", service: "a", topic: "b"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "a"}}, + {topic: &aiven.KafkaTopic{TopicName: "b"}}, + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {topic: &aiven.KafkaTopic{TopicName: "a"}}, + {topic: &aiven.KafkaTopic{TopicName: "b"}}, + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {topic: &aiven.KafkaTopic{TopicName: "a"}}, + {topic: &aiven.KafkaTopic{TopicName: "b"}}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/a/a": {TopicName: "a"}, + "a/a/b": {TopicName: "b"}, + "a/a/c": {TopicName: "c"}, + "a/b/a": {TopicName: "a"}, + "a/b/b": {TopicName: "b"}, + "a/b/c": {TopicName: "c"}, + "b/a/a": {TopicName: "a"}, + "b/a/b": {TopicName: "b"}, + }, + v1ListCalled: 3, // 3 different cervices + // 2 services has 3 topics each. + // Plus one service has 2 topics + // Gives us batches (brackets) with topics (in brackets): + // [2] + [1] + [2] + [1] + [2] + v2ListCalled: 5, + v2ListBatchSize: 2, + }, + { + name: "v1List bla bla bla error", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: fmt.Errorf("bla bla bla")}, + }, + v1ListErr: fmt.Errorf("bla bla bla"), + v1ListCalled: 1, + v2ListCalled: 0, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "v2List bla bla bla error", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: fmt.Errorf("topic read error: All attempts fail:\n#1: bla bla bla")}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v2ListErr: fmt.Errorf("bla bla bla"), + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "v2List 404 error", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: fmt.Errorf("topic read error: All attempts fail:\n#1: topic list has changed")}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v2ListErr: aiven.Error{Status: 404}, + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + } + + for _, opt := range cases { + t.Run(opt.name, func(t *testing.T) { + client := &fakeTopicClient{ + storage: opt.storage, + v1ListErr: opt.v1ListErr, + v2ListErr: opt.v2ListErr, + } + + ctx := context.Background() + rep := newRepository(client) + rep.workerCallInterval = time.Millisecond + rep.v2ListBatchSize = opt.v2ListBatchSize + + // We must run all calls in parallel + // and then call the worker + var wg sync.WaitGroup + for i := range opt.requests { + wg.Add(1) + go func(i int) { + defer wg.Done() + topic, err := rep.Read(ctx, opt.requests[i].project, opt.requests[i].service, opt.requests[i].topic) + assert.Equal(t, opt.responses[i].topic, topic) + assert.Equal(t, opt.responses[i].err, err) + }(i) + } + + go rep.worker() + wg.Wait() + + assert.Equal(t, opt.v1ListCalled, client.v1ListCalled) + assert.Equal(t, opt.v2ListCalled, client.v2ListCalled) + }) + } +} + +var _ topicsClient = &fakeTopicClient{} + +type fakeTopicClient struct { + storage map[string]*aiven.KafkaListTopic // key project/service/topic + createErr error + deleteErr error + v1ListErr error + v2ListErr error + // counters + createCalled int32 + deleteCalled int32 + v1ListCalled int32 + v2ListCalled int32 +} + +func (f *fakeTopicClient) Create(context.Context, string, string, aiven.CreateKafkaTopicRequest) error { + time.Sleep(time.Millisecond * 100) // we need some lag to simulate races + atomic.AddInt32(&f.createCalled, 1) + return f.createErr +} + +func (f *fakeTopicClient) Update(context.Context, string, string, string, aiven.UpdateKafkaTopicRequest) error { + panic("implement me") +} + +func (f *fakeTopicClient) Delete(context.Context, string, string, string) error { + atomic.AddInt32(&f.deleteCalled, 1) + return f.deleteErr +} + +func (f *fakeTopicClient) List(_ context.Context, project, service string) ([]*aiven.KafkaListTopic, error) { + atomic.AddInt32(&f.v1ListCalled, 1) + key := newKey(project, service) + "/" + result := make([]*aiven.KafkaListTopic, 0) + for k, v := range f.storage { + if strings.HasPrefix(k, key) { + result = append(result, v) + } + } + return result, f.v1ListErr +} + +func (f *fakeTopicClient) V2List(_ context.Context, project, service string, topicNames []string) ([]*aiven.KafkaTopic, error) { + atomic.AddInt32(&f.v2ListCalled, 1) + result := make([]*aiven.KafkaTopic, 0) + for _, n := range topicNames { + v, ok := f.storage[newKey(project, service, n)] + if ok { + result = append(result, &aiven.KafkaTopic{TopicName: v.TopicName}) + } + } + return result, f.v2ListErr +} diff --git a/internal/sdkprovider/kafkatopicrepository/update.go b/internal/sdkprovider/kafkatopicrepository/update.go new file mode 100644 index 000000000..59d6ace0d --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/update.go @@ -0,0 +1,11 @@ +package kafkatopicrepository + +import ( + "context" + + "github.com/aiven/aiven-go-client/v2" +) + +func (rep *repository) Update(ctx context.Context, project, service, topic string, req aiven.UpdateKafkaTopicRequest) error { + return rep.client.Update(ctx, project, service, topic, req) +} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 49075d5da..5b3c24d4b 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -3,19 +3,17 @@ package kafkatopic import ( "context" "errors" - "fmt" "log" - "time" "github.com/aiven/aiven-go-client/v2" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/aiven/terraform-provider-aiven/internal/schemautil" "github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig" "github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig/stateupgrader" + "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/kafkatopicrepository" ) var aivenKafkaTopicSchema = map[string]*schema.Schema{ @@ -266,21 +264,6 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int partitions := d.Get("partitions").(int) replication := d.Get("replication").(int) - // aiven.KafkaTopics.Create() function may return 501 on create - // Second call might say that topic already exists - // So to be sure, better check it before create - _, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) - - // No error means topic exists - if err == nil { - return diag.Errorf("Topic conflict, already exists: %s", topicName) - } - - // If this is not "does not exist", then something happened - if !aiven.IsNotFound(err) { - return diag.FromErr(err) - } - createRequest := aiven.CreateKafkaTopicRequest{ Partitions: &partitions, Replication: &replication, @@ -289,21 +272,14 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int Tags: getTags(d), } - err = m.(*aiven.Client).KafkaTopics.Create( - ctx, - project, - serviceName, - createRequest, - ) - if err != nil && !aiven.IsAlreadyExists(err) { + client := m.(*aiven.Client) + err := kafkatopicrepository.New(client.KafkaTopics).Create(ctx, project, serviceName, createRequest) + if err != nil { return diag.FromErr(err) } d.SetId(schemautil.BuildResourceID(project, serviceName, topicName)) - // Invalidates cache for the topic - DeleteTopicFromCache(project, serviceName, topicName) - // We do not call a Kafka Topic read here to speed up the performance. // However, in the case of Kafka Topic resource getting a computed field // in the future, a read operation should be called after creation. @@ -370,7 +346,8 @@ func resourceKafkaTopicRead(ctx context.Context, d *schema.ResourceData, m inter return diag.FromErr(err) } - topic, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) + client := m.(*aiven.Client) + topic, err := kafkatopicrepository.New(client.KafkaTopics).Read(ctx, project, serviceName, topicName) // Topics are destroyed when kafka is off // https://docs.aiven.io/docs/platform/concepts/service-power-cycle @@ -441,40 +418,15 @@ func flattenKafkaTopicTags(list []aiven.KafkaTopicTag) []map[string]interface{} return tags } -func getTopic(ctx context.Context, m interface{}, timeout time.Duration, project, serviceName, topicName string) (*aiven.KafkaTopic, error) { - client, ok := m.(*aiven.Client) - if !ok { - return nil, fmt.Errorf("invalid Aiven client") - } - - w, err := newKafkaTopicAvailabilityWaiter(ctx, client, project, serviceName, topicName) - if err != nil { - return nil, err - } - - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - topic, err := w.Conf(timeout).WaitForStateContext(ctx) - if err != nil { - return nil, err - } - - kt, ok := topic.(aiven.KafkaTopic) - if !ok { - return nil, fmt.Errorf("can't cast value to aiven.KafkaTopic") - } - return &kt, nil -} - func resourceKafkaTopicUpdate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { - client := m.(*aiven.Client) - partitions := d.Get("partitions").(int) projectName, serviceName, topicName, err := schemautil.SplitResourceID3(d.Id()) if err != nil { return diag.FromErr(err) } - err = client.KafkaTopics.Update( + client := m.(*aiven.Client) + err = kafkatopicrepository.New(client.KafkaTopics).Update( ctx, projectName, serviceName, @@ -494,8 +446,6 @@ func resourceKafkaTopicUpdate(ctx context.Context, d *schema.ResourceData, m int } func resourceKafkaTopicDelete(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { - client := m.(*aiven.Client) - projectName, serviceName, topicName, err := schemautil.SplitResourceID3(d.Id()) if err != nil { return diag.FromErr(err) @@ -505,18 +455,7 @@ func resourceKafkaTopicDelete(ctx context.Context, d *schema.ResourceData, m int return diag.Errorf("cannot delete kafka topic when termination_protection is enabled") } - waiter := TopicDeleteWaiter{ - Context: ctx, - Client: client, - ProjectName: projectName, - ServiceName: serviceName, - TopicName: topicName, - } - - timeout := d.Timeout(schema.TimeoutDelete) - - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - _, err = waiter.Conf(timeout).WaitForStateContext(ctx) + err = kafkatopicrepository.New(m.(*aiven.Client).KafkaTopics).Delete(ctx, projectName, serviceName, topicName) if err != nil { return diag.Errorf("error waiting for Aiven Kafka Topic to be DELETED: %s", err) } @@ -553,42 +492,3 @@ func flattenKafkaTopicConfig(t *aiven.KafkaTopic) []map[string]interface{} { }, } } - -// TopicDeleteWaiter is used to wait for Kafka Topic to be deleted. -type TopicDeleteWaiter struct { - Context context.Context - Client *aiven.Client - ProjectName string - ServiceName string - TopicName string -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *TopicDeleteWaiter) RefreshFunc() resource.StateRefreshFunc { - return func() (interface{}, string, error) { - err := w.Client.KafkaTopics.Delete(w.Context, w.ProjectName, w.ServiceName, w.TopicName) - if err != nil { - if !aiven.IsNotFound(err) { - return nil, "REMOVING", nil - } - } - - return aiven.KafkaTopic{}, "DELETED", nil - } -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *TopicDeleteWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Delete waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"REMOVING"}, - Target: []string{"DELETED"}, - Refresh: w.RefreshFunc(), - Delay: 1 * time.Second, - Timeout: timeout, - MinTimeout: 1 * time.Second, - } -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go deleted file mode 100644 index 932759eb0..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go +++ /dev/null @@ -1,205 +0,0 @@ -package kafkatopic - -import ( - "log" - "sync" - - "github.com/aiven/aiven-go-client/v2" - "golang.org/x/exp/slices" -) - -var topicCache = newTopicCache() - -// kafkaTopicCache represents Kafka Topics cache based on Service and Project identifiers -type kafkaTopicCache struct { - sync.RWMutex - internal map[string]map[string]aiven.KafkaTopic - inQueue map[string][]string - missing map[string][]string - v1list map[string][]string -} - -// newTopicCache creates new instance of Kafka Topic Cache -func newTopicCache() *kafkaTopicCache { - return &kafkaTopicCache{ - internal: make(map[string]map[string]aiven.KafkaTopic), - inQueue: make(map[string][]string), - missing: make(map[string][]string), - v1list: make(map[string][]string), - } -} - -// getTopicCache gets a global Kafka Topics Cache -func getTopicCache() *kafkaTopicCache { - return topicCache -} - -// LoadByProjectAndServiceName returns a list of Kafka Topics stored in the cache for a given Project -// and Service names, or nil if no value is present. -// The ok result indicates whether value was found in the map. -func (t *kafkaTopicCache) LoadByProjectAndServiceName(projectName, serviceName string) (map[string]aiven.KafkaTopic, bool) { - t.RLock() - result, ok := t.internal[projectName+serviceName] - t.RUnlock() - - return result, ok -} - -// LoadByTopicName returns a list of Kafka Topics stored in the cache for a given Project -// and Service names, or nil if no value is present. -// The ok result indicates whether value was found in the map. -func (t *kafkaTopicCache) LoadByTopicName(projectName, serviceName, topicName string) (aiven.KafkaTopic, bool) { - t.RLock() - defer t.RUnlock() - - topics, ok := t.internal[projectName+serviceName] - if !ok { - return aiven.KafkaTopic{State: "CONFIGURING"}, false - } - - result, ok := topics[topicName] - if !ok { - result.State = "CONFIGURING" - } - - log.Printf("[TRACE] retrieving from a topic cache `%+#v` for a topic name `%s`", result, topicName) - - return result, ok -} - -// DeleteByProjectAndServiceName deletes the cache value for a key which is a combination of Project -// and Service names. -func (t *kafkaTopicCache) DeleteByProjectAndServiceName(projectName, serviceName string) { - t.Lock() - delete(t.internal, projectName+serviceName) - t.Unlock() -} - -// StoreByProjectAndServiceName sets the values for a Project name and Service name key. -func (t *kafkaTopicCache) StoreByProjectAndServiceName(projectName, serviceName string, list []*aiven.KafkaTopic) { - if len(list) == 0 { - return - } - - log.Printf("[DEBUG] Updating Kafka Topic cache for project %s and service %s ...", projectName, serviceName) - - for _, topic := range list { - t.Lock() - if _, ok := t.internal[projectName+serviceName]; !ok { - t.internal[projectName+serviceName] = make(map[string]aiven.KafkaTopic) - } - t.internal[projectName+serviceName][topic.TopicName] = *topic - - // when topic is added to cache, it need to be deleted from the queue - for i, name := range t.inQueue[projectName+serviceName] { - if name == topic.TopicName { - t.inQueue[projectName+serviceName] = append(t.inQueue[projectName+serviceName][:i], t.inQueue[projectName+serviceName][i+1:]...) - } - } - - t.Unlock() - } -} - -// AddToQueue adds a topic name to a queue of topics to be found -func (t *kafkaTopicCache) AddToQueue(projectName, serviceName, topicName string) { - var isFound bool - - t.Lock() - // check if topic is already in the queue - for _, name := range t.inQueue[projectName+serviceName] { - if name == topicName { - isFound = true - } - } - - _, inCache := t.internal[projectName+serviceName][topicName] - // the only topic that is not in the queue nor inside cache can be added to the queue - if !isFound && !inCache { - t.inQueue[projectName+serviceName] = append(t.inQueue[projectName+serviceName], topicName) - } - t.Unlock() -} - -// DeleteFromQueueAndMarkMissing topic from the queue and marks it as missing -func (t *kafkaTopicCache) DeleteFromQueueAndMarkMissing(projectName, serviceName, topicName string) { - t.Lock() - for k, name := range t.inQueue[projectName+serviceName] { - if name == topicName { - t.inQueue[projectName+serviceName] = slices.Delete(t.inQueue[projectName+serviceName], k, k+1) - } - } - - t.missing[projectName+serviceName] = append(t.missing[projectName+serviceName], topicName) - t.Unlock() -} - -// GetMissing retrieves a list of missing topics -func (t *kafkaTopicCache) GetMissing(projectName, serviceName string) []string { - t.RLock() - defer t.RUnlock() - - return t.missing[projectName+serviceName] -} - -// GetQueue retrieves a topics queue, retrieves up to 100 first elements -func (t *kafkaTopicCache) GetQueue(projectName, serviceName string) []string { - t.RLock() - defer t.RUnlock() - - if len(t.inQueue[projectName+serviceName]) >= 100 { - return t.inQueue[projectName+serviceName][:99] - } - - return t.inQueue[projectName+serviceName] -} - -// SetV1List sets v1 topics list -func (t *kafkaTopicCache) SetV1List(projectName, serviceName string, list []*aiven.KafkaListTopic) { - t.Lock() - for _, v := range list { - t.v1list[projectName+serviceName] = append(t.v1list[projectName+serviceName], v.TopicName) - } - t.Unlock() -} - -// GetV1List retrieves a list of V1 kafka topic names -func (t *kafkaTopicCache) GetV1List(projectName, serviceName string) []string { - t.RLock() - defer t.RUnlock() - - return t.v1list[projectName+serviceName] -} - -// DeleteTopicFromCache Invalidates cache for the topic -// This function only exists to pass acceptance tests. Cache invalidation -// happens automatically in Terraform when used in the real-life world between -// each subsequent operation. However, during the acceptance test execution, -// we need to mimic the cache invalidation mechanism by calling this function. -func DeleteTopicFromCache(projectName, serviceName, topicName string) { - t := getTopicCache() - t.Lock() - key := projectName + serviceName - for k, name := range t.missing[key] { - if name == topicName { - if l := len(t.missing[key]); k+1 > l { - t.missing[key] = t.missing[key][:l-1] - continue - } - t.missing[key] = slices.Delete(t.missing[key], k, k+1) - } - } - for k, name := range t.v1list[key] { - if name == topicName { - if l := len(t.v1list[key]); k+1 > l { - t.v1list[key] = t.v1list[key][:l-1] - continue - } - t.v1list[key] = slices.Delete(t.v1list[key], k, k+1) - } - } - if t.internal[key] != nil { - delete(t.internal[key], topicName) - } - t.Unlock() -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache_test.go deleted file mode 100644 index c21b03040..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache_test.go +++ /dev/null @@ -1,192 +0,0 @@ -package kafkatopic - -import ( - "reflect" - "testing" - - "github.com/aiven/aiven-go-client/v2" -) - -func TestTopicCache_LoadByProjectAndServiceName(t1 *testing.T) { - type args struct { - projectName string - serviceName string - } - tests := []struct { - name string - doSomething func(*kafkaTopicCache) - args args - want map[string]aiven.KafkaTopic - want1 bool - }{ - { - "not_found", - func(*kafkaTopicCache) { - }, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - }, - nil, - false, - }, - { - "basic", - testAddTwoTopicsToCache, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - }, - map[string]aiven.KafkaTopic{ - "topic-1": { - Replication: 3, - State: "AVAILABLE", - TopicName: "topic-1", - }, - "topic-2": { - Replication: 1, - State: "AVAILABLE", - TopicName: "topic-2", - }, - }, - true, - }, - } - for _, tt := range tests { - t := newTopicCache() - tt.doSomething(t) - - t1.Run(tt.name, func(t1 *testing.T) { - got, got1 := t.LoadByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - if !reflect.DeepEqual(got, tt.want) { - t1.Errorf("LoadByProjectAndServiceName() got = %v, want %v", got, tt.want) - } - if got1 != tt.want1 { - t1.Errorf("LoadByProjectAndServiceName() got1 = %v, want %v", got1, tt.want1) - } - }) - } -} - -func TestTopicCache_LoadByTopicName(t1 *testing.T) { - type args struct { - projectName string - serviceName string - topicName string - } - tests := []struct { - name string - doSomething func(*kafkaTopicCache) - args args - want aiven.KafkaTopic - want1 bool - }{ - { - "not_found", - func(*kafkaTopicCache) { - - }, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - topicName: "topic-1", - }, - aiven.KafkaTopic{ - State: "CONFIGURING", - }, - false, - }, - { - "basic", - testAddTwoTopicsToCache, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - topicName: "topic-1", - }, - aiven.KafkaTopic{ - Replication: 3, - State: "AVAILABLE", - TopicName: "topic-1", - }, - true, - }, - } - for _, tt := range tests { - t := newTopicCache() - tt.doSomething(t) - - t1.Run(tt.name, func(t1 *testing.T) { - got, got1 := t.LoadByTopicName(tt.args.projectName, tt.args.serviceName, tt.args.topicName) - if !reflect.DeepEqual(got, tt.want) { - t1.Errorf("LoadByTopicName() got = %v, want %v", got, tt.want) - } - if got1 != tt.want1 { - t1.Errorf("LoadByTopicName() got1 = %v, want %v", got1, tt.want1) - } - }) - } -} - -func TestTopicCache_DeleteByProjectAndServiceName(t1 *testing.T) { - type args struct { - projectName string - serviceName string - } - tests := []struct { - name string - doSomething func(*kafkaTopicCache) - args args - }{ - { - "basic", - testAddTwoTopicsToCache, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - }, - }, - } - for _, tt := range tests { - t := newTopicCache() - tt.doSomething(t) - - t1.Run(tt.name, func(t1 *testing.T) { - got, got1 := t.LoadByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - if len(got) == 0 { - t1.Errorf("LoadByProjectAndServiceName() got = %v", got) - } - if got1 != true { - t1.Errorf("LoadByProjectAndServiceName() got1 = %v", got1) - } - - t.DeleteByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - - got, got1 = t.LoadByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - if len(got) != 0 { - t1.Errorf("After deletion LoadByProjectAndServiceName() should be empty, got = %v", got) - } - if got1 != false { - t1.Errorf("After deletion LoadByProjectAndServiceName() got1 whould be false = %v", got1) - } - }) - } -} - -func testAddTwoTopicsToCache(c *kafkaTopicCache) { - c.StoreByProjectAndServiceName( - "test-pr1", - "test-sr1", - []*aiven.KafkaTopic{ - { - Replication: 3, - State: "AVAILABLE", - TopicName: "topic-1", - }, - { - Replication: 1, - State: "AVAILABLE", - TopicName: "topic-2", - }, - }) -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index d0813db82..f183b743f 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -19,7 +19,7 @@ import ( acc "github.com/aiven/terraform-provider-aiven/internal/acctest" "github.com/aiven/terraform-provider-aiven/internal/schemautil" - "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/service/kafkatopic" + "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/kafkatopicrepository" ) func TestAccAivenKafkaTopic_basic(t *testing.T) { @@ -392,8 +392,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) { assert.Nil(t, tc) assert.True(t, aiven.IsNotFound(err)) - // Invalidates cache for the topic - kafkatopic.DeleteTopicFromCache(project, kafkaName, topicName) + // We need to remove it from reps cache + assert.NoError(t, kafkatopicrepository.ForgetTopic(project, kafkaName, topicName)) }, // Now plan shows a diff ExpectNonEmptyPlan: true, @@ -535,7 +535,7 @@ func TestAccAivenKafkaTopic_conflicts_if_exists(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccAivenKafkaTopicConflictsIfExists(prefix, project), - ExpectError: regexp.MustCompile(`Topic conflict, already exists: conflict`), + ExpectError: regexp.MustCompile(`Topic conflict, already exists`), }, }, }) diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go deleted file mode 100644 index e93357936..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ /dev/null @@ -1,141 +0,0 @@ -package kafkatopic - -import ( - "context" - "fmt" - "log" - "time" - - "github.com/aiven/aiven-go-client/v2" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" - "golang.org/x/exp/slices" - "golang.org/x/sync/semaphore" -) - -// kafkaTopicAvailabilityWaiter is used to refresh the Aiven Kafka Topic endpoints when -// provisioning. -type kafkaTopicAvailabilityWaiter struct { - Context context.Context - Client *aiven.Client - Project string - ServiceName string - TopicName string -} - -var kafkaTopicAvailabilitySem = semaphore.NewWeighted(1) - -func newKafkaTopicAvailabilityWaiter( - ctx context.Context, - client *aiven.Client, - project string, - serviceName string, - topicName string, -) (*kafkaTopicAvailabilityWaiter, error) { - if len(project)*len(serviceName)*len(topicName) == 0 { - return nil, fmt.Errorf("return invalid input: project=%q, serviceName=%q, topicName=%q", project, serviceName, topicName) - } - return &kafkaTopicAvailabilityWaiter{ - Context: ctx, - Client: client, - Project: project, - ServiceName: serviceName, - TopicName: topicName, - }, nil -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { - return func() (interface{}, string, error) { - cache := getTopicCache() - - // Checking if the topic is in the missing list. If so, trowing 404 error - if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) { - return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("topic `%s` is not found", w.TopicName)} - } - - topic, ok := cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) - if !ok { - if err := w.refresh(); err != nil { - return nil, "CONFIGURING", err - } - - topic, ok = cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) - if !ok { - return nil, "CONFIGURING", nil - } - } - - log.Printf("[DEBUG] Got `%s` state while waiting for topic `%s` to be up.", topic.State, w.TopicName) - - return topic, topic.State, nil - } -} - -func (w *kafkaTopicAvailabilityWaiter) refresh() error { - c := getTopicCache() - - if !kafkaTopicAvailabilitySem.TryAcquire(1) { - log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") - return nil - } - defer kafkaTopicAvailabilitySem.Release(1) - - // check if topic is already in cache - if _, ok := c.LoadByTopicName(w.Project, w.ServiceName, w.TopicName); ok { - return nil - } - - c.AddToQueue(w.Project, w.ServiceName, w.TopicName) - - queue := c.GetQueue(w.Project, w.ServiceName) - if len(queue) == 0 { - return nil - } - - log.Printf("[DEBUG] kakfa topic queue : %+v", queue) - v2Topics, err := w.Client.KafkaTopics.V2List(w.Context, w.Project, w.ServiceName, queue) - if err != nil { - // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch - // do not exist but does not say which ones are missing. Therefore, we need to - // identify the none existing topics. - if aiven.IsNotFound(err) { - log.Printf("[DEBUG] v2 list 404 error, queue : %+v, error: %s", queue, err) - - list, err := w.Client.KafkaTopics.List(w.Context, w.Project, w.ServiceName) - if err != nil { - return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) - } - log.Printf("[DEBUG] v1 list results : %+v", list) - c.SetV1List(w.Project, w.ServiceName, list) - - // If topic is missing in V1 list then it does not exist, flagging it as missing - for _, t := range queue { - if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { - c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) - } - } - return nil - } - return err - } - - c.StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) - - return nil -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicAvailabilityWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Kafka Topic availability waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"CONFIGURING"}, - Target: []string{"ACTIVE"}, - Refresh: w.RefreshFunc(), - Timeout: timeout, - PollInterval: 5 * time.Second, - NotFoundChecks: 100, - } -}