diff --git a/examples_tests/base.go b/examples_tests/base.go index fa5144417..02a726b7f 100644 --- a/examples_tests/base.go +++ b/examples_tests/base.go @@ -64,6 +64,8 @@ func (s *BaseTestSuite) TearDownSuite() { } // withDefaults adds default options for terraform test +// +//lint:ignore U1000 Ignore unused function. Used in child structs func (s *BaseTestSuite) withDefaults(opts *terraform.Options) *terraform.Options { // No need to use lock file for dev build opts.Lock = false diff --git a/go.mod b/go.mod index a2fd9831f..a9038c410 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ go 1.19 require ( github.com/aiven/aiven-go-client v1.36.0 + github.com/avast/retry-go v3.0.0+incompatible github.com/dave/jennifer v1.7.0 github.com/docker/go-units v0.5.0 github.com/ettle/strcase v0.1.1 @@ -15,9 +16,9 @@ require ( github.com/hashicorp/terraform-plugin-mux v0.11.2 github.com/hashicorp/terraform-plugin-sdk/v2 v2.28.0 github.com/kelseyhightower/envconfig v1.4.0 + github.com/samber/lo v1.38.1 github.com/stretchr/testify v1.8.4 golang.org/x/exp v0.0.0-20230809150735-7b3493d9a819 - golang.org/x/sync v0.3.0 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index b642b0806..219ea6e59 100644 --- a/go.sum +++ b/go.sum @@ -206,6 +206,8 @@ github.com/apparentlymart/go-textseg v1.0.0/go.mod h1:z96Txxhf3xSFMPmb5X/1W05FF/ github.com/apparentlymart/go-textseg/v12 v12.0.0/go.mod h1:S/4uRK2UtaQttw1GenVJEynmyUenKwP++x/+DdGV/Ec= github.com/apparentlymart/go-textseg/v13 v13.0.0 h1:Y+KvPE1NYz0xl601PVImeQfFyEy6iT90AvPUL1NNfNw= github.com/apparentlymart/go-textseg/v13 v13.0.0/go.mod h1:ZK2fH7c4NqDTLtiYLvIkEghdlcqw7yxLeM89kiTRPUo= +github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0= +github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY= github.com/aws/aws-sdk-go v1.44.122 h1:p6mw01WBaNpbdP2xrisz5tIkcNwzj/HysobNoaAHjgo= github.com/aws/aws-sdk-go v1.44.122/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo= github.com/bgentry/go-netrc v0.0.0-20140422174119-9fd32a8b3d3d h1:xDfNPAt8lFiC1UJrqV3uuy861HCTo708pDMbjHHdCas= @@ -496,6 +498,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/samber/lo v1.38.1 h1:j2XEAqXKb09Am4ebOg31SpvzUTTs6EN3VfgeLUhPdXM= +github.com/samber/lo v1.38.1/go.mod h1:+m/ZKRl6ClXCE2Lgf3MsQlWfh4bn1bz6CXEOxnEXnEA= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/sergi/go-diff v1.2.0 h1:XU+rvMAioB0UC3q1MFrIQy4Vo5/4VsRDQQXHsEya6xQ= github.com/skeema/knownhosts v1.1.0 h1:Wvr9V0MxhjRbl3f9nMnKnFfiWTJmtECJ9Njkea3ysW0= @@ -691,8 +695,6 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220929204114-8fcdb60fdcc0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/sdkprovider/kafkatopicrepository/create.go b/internal/sdkprovider/kafkatopicrepository/create.go new file mode 100644 index 000000000..979763e36 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/create.go @@ -0,0 +1,29 @@ +package kafkatopicrepository + +import ( + "context" + "time" + + "github.com/aiven/aiven-go-client" + "github.com/avast/retry-go" +) + +func (rep *repo) Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error { + return retry.Do(func() error { + err := rep.client.Create(project, service, req) + if err == nil || aiven.IsAlreadyExists(err) { + rep.Lock() + rep.cachedTopics[newKey(project, service, req.TopicName)] = true + rep.Unlock() + return nil + } + + // If some brokers are offline while the request is being executed + // the operation may fail. + _, ok := err.(aiven.Error) + if ok { + return err + } + return retry.Unrecoverable(err) + }, retry.Context(ctx), retry.Delay(10*time.Second)) +} diff --git a/internal/sdkprovider/kafkatopicrepository/delete.go b/internal/sdkprovider/kafkatopicrepository/delete.go new file mode 100644 index 000000000..0878a2acf --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/delete.go @@ -0,0 +1,18 @@ +package kafkatopicrepository + +import ( + "context" + + "github.com/aiven/aiven-go-client" +) + +func (rep *repo) Delete(_ context.Context, project, service, topic string) error { + err := rep.client.Delete(project, service, topic) + if !(err == nil || aiven.IsNotFound(err)) { + return err + } + rep.Lock() + delete(rep.cachedTopics, newKey(project, service, topic)) + rep.Unlock() + return nil +} diff --git a/internal/sdkprovider/kafkatopicrepository/read.go b/internal/sdkprovider/kafkatopicrepository/read.go new file mode 100644 index 000000000..1bed4cdf0 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/read.go @@ -0,0 +1,168 @@ +package kafkatopicrepository + +import ( + "context" + "fmt" + + "github.com/aiven/aiven-go-client" + "github.com/avast/retry-go" + "github.com/samber/lo" +) + +func (rep *repo) Read(ctx context.Context, project, service, topic string) (*aiven.KafkaTopic, error) { + c := make(chan *response, 1) + req := &request{ + project: project, + service: service, + topic: topic, + rsp: c, + } + + // We can speed up here: + // if service has been cached we know if topic exists + serviceKey := newKey(project, service) + if rep.cachedServices[serviceKey] && !rep.cachedTopics[req.key()] { + return nil, errNotFound + } + + // Adds request to the queue + rep.Lock() + rep.queue = append(rep.queue, req) + rep.Unlock() + + // Waits response from channel + // Or exists on context done + select { + case <-ctx.Done(): + return nil, ctx.Err() + case rsp := <-c: + close(c) + return rsp.topic, rsp.err + } +} + +// fetch fetches requested topics +// 1. groups topics by service +// 2. removes "not found" topics +// 3. requests "found" topics (in chunks) +func (rep *repo) fetch(queue map[string]*request) { + // Groups topics by service + rawByService := make(map[string][]*request, 0) + for i := range queue { + r := queue[i] + key := newKey(r.project, r.service) + rawByService[key] = append(rawByService[key], r) + } + + for k, reqs := range rawByService { + project := reqs[0].project + service := reqs[0].service + + // If service has been cached, we don't call v1List + serviceKey := newKey(project, service) + if rep.cachedServices[serviceKey] { + continue + } + + // Requests all topics for the service + list, err := rep.client.List(project, service) + if err != nil { + for _, r := range reqs { + r.send(nil, err) + } + + // It failed the whole group + delete(rawByService, k) + continue + } + + // We cache all service topics + rep.Lock() + for _, t := range list { + rep.cachedTopics[newKey(project, service, t.TopicName)] = true + } + + // Service is cached + rep.cachedServices[serviceKey] = true + rep.Unlock() + } + + // Removes "not found" topics from the list + // If we call V2List with at least one "not found" topic, + // it will return 404 for all topics + foundByService := make([]map[string]*request, 0, len(rawByService)) + for _, reqs := range rawByService { + // Send "not found" and gathers "found" + found := make(map[string]*request, len(reqs)) + for i := range reqs { + r := reqs[i] + if rep.cachedTopics[r.key()] { + found[r.topic] = r + } else { + r.send(nil, errNotFound) + } + } + + // If any exists + if len(found) != 0 { + foundByService = append(foundByService, found) + } + } + + // Fetches "found" topics configuration + for _, reqs := range foundByService { + topicNames := make([]string, 0, len(reqs)) + for _, r := range reqs { + topicNames = append(topicNames, r.topic) + } + + // foundByService are grouped by service + // We can share this values + project := reqs[topicNames[0]].project + service := reqs[topicNames[0]].service + + // Slices topic names by rep.v2ListBatchSize + // because V2List has a limit + for _, chunk := range lo.Chunk(topicNames, rep.v2ListBatchSize) { + // V2List() and Get() do not get info immediately + // Some retries should be applied + var list []*aiven.KafkaTopic + err := retry.Do(func() error { + rspList, err := rep.client.V2List(project, service, chunk) + + // 404 means that there is "not found" on the list + // But previous step should have cleaned it, so now this is a fail + if aiven.IsNotFound(err) { + return retry.Unrecoverable(fmt.Errorf("topic list has changed")) + } + + // Something else happened + // We have retries in the client, so this is bad + if err != nil { + return retry.Unrecoverable(err) + } + + // This is an old cachedTopics, we need to retry until succeed + if len(rspList) != len(chunk) { + return fmt.Errorf("got %d topics, expected %d. Retrying", len(rspList), len(chunk)) + } + + list = rspList + return nil + }, retry.Delay(rep.v2ListRetryDelay)) + + if err != nil { + // Send errors + for _, r := range reqs { + // Flattens error to string, because it might go really completed for testing + r.send(nil, fmt.Errorf("topic read error: %s", err)) + } + } else { + // Sends topics + for _, t := range list { + reqs[t.TopicName].send(t, nil) + } + } + } + } +} diff --git a/internal/sdkprovider/kafkatopicrepository/repository.go b/internal/sdkprovider/kafkatopicrepository/repository.go new file mode 100644 index 000000000..eb21c112a --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/repository.go @@ -0,0 +1,163 @@ +package kafkatopicrepository + +import ( + "context" + "net/http" + "strings" + "sync" + "time" + + "github.com/aiven/aiven-go-client" +) + +var ( + initOnce sync.Once + singleRep = &repo{} + errNotFound = aiven.Error{Status: http.StatusNotFound, Message: "Topic not found"} +) + +const ( + // defaultV2ListBatchSize the max size of batch to call V2List + defaultV2ListBatchSize = 100 + + // defaultV2ListRetryDelay V2List caches results, so we retry it by this delay + defaultV2ListRetryDelay = 5 * time.Second + + // defaultWorkerCallInterval how often worker should run + defaultWorkerCallInterval = time.Second + defaultTopicCacheSize = 1000 + defaultServiceCacheSize = 10 +) + +// Repository CRUD interface for topics +type Repository interface { + Create(ctx context.Context, project, service string, req aiven.CreateKafkaTopicRequest) error + Read(ctx context.Context, project, service, topic string) (*aiven.KafkaTopic, error) + Update(ctx context.Context, project, service, topic string, req aiven.UpdateKafkaTopicRequest) error + Delete(ctx context.Context, project, service, topic string) error +} + +// topicsClient interface for unit tests +type topicsClient interface { + // List returns existing topics + List(project, service string) ([]*aiven.KafkaListTopic, error) + // V2List returns requested topics configuration + V2List(project, service string, topicNames []string) ([]*aiven.KafkaTopic, error) + + // Create creates topic with retries + Create(project, service string, req aiven.CreateKafkaTopicRequest) error + + // Update updates topic + Update(project, service, topic string, req aiven.UpdateKafkaTopicRequest) error + + // Delete deletes topic + Delete(project, service, topic string) error +} + +// NewRepository returns process singleton Repository +func NewRepository(client topicsClient) Repository { + initOnce.Do(func() { + singleRep = newRepo(client) + go singleRep.worker() + }) + return singleRep +} + +func newRepo(client topicsClient) *repo { + r := &repo{ + client: client, + cachedTopics: make(map[string]bool, defaultTopicCacheSize), + cachedServices: make(map[string]bool, defaultServiceCacheSize), + v2ListBatchSize: defaultV2ListBatchSize, + v2ListRetryDelay: defaultV2ListRetryDelay, + workerCallInterval: defaultWorkerCallInterval, + } + return r +} + +type repo struct { + sync.Mutex + client topicsClient + queue []*request + v2ListBatchSize int + v2ListRetryDelay time.Duration + workerCallInterval time.Duration + // cachedTopics stores topic names + // because V1List might not return fresh topics + cachedTopics map[string]bool + + // cachedServices once service topics are cached, we never cache then again. + // When Terraform runs in parallel + // this repository get N (default by CPU) topics to fetch + // Which makes calling V1List for every N topics. + // If we have 1000 topics and 6 CPUs, that will make too much useless calls. + // So we cache v1List topics to cachedTopics and mark the service being cached here. + // Then we look for topics in cachedTopics, and never call v1List anymore + cachedServices map[string]bool +} + +// worker runs in background and processes the queue +func (rep *repo) worker() { + ticker := time.NewTicker(rep.workerCallInterval) + for { + <-ticker.C + b := rep.withdraw() + if b != nil { + rep.fetch(b) + } + } +} + +// withdraw returns the queue and cleans it +func (rep *repo) withdraw() map[string]*request { + rep.Lock() + defer rep.Unlock() + + if len(rep.queue) == 0 { + return nil + } + + q := make(map[string]*request, len(rep.queue)) + for _, r := range rep.queue { + q[r.key()] = r + } + + rep.queue = make([]*request, 0) + return q +} + +func newKey(parts ...string) string { + return strings.Join(parts, "/") +} + +type response struct { + topic *aiven.KafkaTopic + err error +} + +type request struct { + project string + service string + topic string + rsp chan *response +} + +func (r *request) key() string { + return newKey(r.project, r.service, r.topic) +} + +func (r *request) send(topic *aiven.KafkaTopic, err error) { + r.rsp <- &response{topic: topic, err: err} +} + +// DeleteTopicCache removes topic from internal cachedTopics. For tests only! +func DeleteTopicCache(project, service, topic string) error { + singleRep.Lock() + key := newKey(project, service, topic) + if !singleRep.cachedTopics[key] { + return errNotFound + } + singleRep.cachedTopics[key] = false + singleRep.Unlock() + return nil +} diff --git a/internal/sdkprovider/kafkatopicrepository/repository_test.go b/internal/sdkprovider/kafkatopicrepository/repository_test.go new file mode 100644 index 000000000..e40f311fb --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/repository_test.go @@ -0,0 +1,278 @@ +package kafkatopicrepository + +import ( + "context" + "fmt" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/aiven/aiven-go-client" + "github.com/stretchr/testify/assert" +) + +func TestRepository(t *testing.T) { + cases := []struct { + name string + requests []request + responses []response + storage map[string]*aiven.KafkaListTopic + v1ListErr error + v1ListCalled int32 + v2ListErr error + v2ListCalled int32 + v2ListBatchSize int + }{ + { + name: "unknown topic returns 404", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: errNotFound}, + }, + storage: make(map[string]*aiven.KafkaListTopic), + v1ListCalled: 1, + v2ListCalled: 0, // doesn't reach V2List, because "storage" doesn't return the topic + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "gets topic", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "one OK, one 404 same service", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + {project: "a", service: "b", topic: "d"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {err: errNotFound}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "one OK, one 404 different services", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + {project: "a", service: "d", topic: "e"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {err: errNotFound}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v1ListCalled: 2, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "two OK different services", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + {project: "a", service: "d", topic: "e"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {topic: &aiven.KafkaTopic{TopicName: "e"}}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + "a/d/e": {TopicName: "e"}, + }, + v1ListCalled: 2, + v2ListCalled: 2, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "different projects, different services, multiple batches", + requests: []request{ + // Service a/a + {project: "a", service: "a", topic: "a"}, + {project: "a", service: "a", topic: "b"}, + {project: "a", service: "a", topic: "c"}, + // Service a/b + {project: "a", service: "b", topic: "a"}, + {project: "a", service: "b", topic: "b"}, + {project: "a", service: "b", topic: "c"}, + // Service b/a + {project: "b", service: "a", topic: "a"}, + {project: "b", service: "a", topic: "b"}, + }, + responses: []response{ + {topic: &aiven.KafkaTopic{TopicName: "a"}}, + {topic: &aiven.KafkaTopic{TopicName: "b"}}, + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {topic: &aiven.KafkaTopic{TopicName: "a"}}, + {topic: &aiven.KafkaTopic{TopicName: "b"}}, + {topic: &aiven.KafkaTopic{TopicName: "c"}}, + {topic: &aiven.KafkaTopic{TopicName: "a"}}, + {topic: &aiven.KafkaTopic{TopicName: "b"}}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/a/a": {TopicName: "a"}, + "a/a/b": {TopicName: "b"}, + "a/a/c": {TopicName: "c"}, + "a/b/a": {TopicName: "a"}, + "a/b/b": {TopicName: "b"}, + "a/b/c": {TopicName: "c"}, + "b/a/a": {TopicName: "a"}, + "b/a/b": {TopicName: "b"}, + }, + v1ListCalled: 3, // 3 different cervices + // 2 services has 3 topics each. + // Plus one service has 2 topics + // Gives us batches (brackets) with topics (in brackets): + // [2] + [1] + [2] + [1] + [2] + v2ListCalled: 5, + v2ListBatchSize: 2, + }, + { + name: "v1List bla bla bla error", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: fmt.Errorf("bla bla bla")}, + }, + v1ListErr: fmt.Errorf("bla bla bla"), + v1ListCalled: 1, + v2ListCalled: 0, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "v2List bla bla bla error", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: fmt.Errorf("topic read error: All attempts fail:\n#1: bla bla bla")}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v2ListErr: fmt.Errorf("bla bla bla"), + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + { + name: "v2List 404 error", + requests: []request{ + {project: "a", service: "b", topic: "c"}, + }, + responses: []response{ + {err: fmt.Errorf("topic read error: All attempts fail:\n#1: topic list has changed")}, + }, + storage: map[string]*aiven.KafkaListTopic{ + "a/b/c": {TopicName: "c"}, + }, + v2ListErr: aiven.Error{Status: 404}, + v1ListCalled: 1, + v2ListCalled: 1, + v2ListBatchSize: defaultV2ListBatchSize, + }, + } + + for _, opt := range cases { + t.Run(opt.name, func(t *testing.T) { + client := &fakeTopicClient{ + storage: opt.storage, + v1ListErr: opt.v1ListErr, + v2ListErr: opt.v2ListErr, + } + + ctx := context.Background() + rep := newRepo(client) + rep.workerCallInterval = time.Millisecond + rep.v2ListBatchSize = opt.v2ListBatchSize + + // We must run all calls in parallel + // and then call the worker + var wg sync.WaitGroup + for i := range opt.requests { + wg.Add(1) + go func(i int) { + defer wg.Done() + topic, err := rep.Read(ctx, opt.requests[i].project, opt.requests[i].service, opt.requests[i].topic) + assert.Equal(t, opt.responses[i].topic, topic) + assert.Equal(t, opt.responses[i].err, err) + }(i) + } + + go rep.worker() + wg.Wait() + + assert.Equal(t, opt.v1ListCalled, client.v1ListCalled) + assert.Equal(t, opt.v2ListCalled, client.v2ListCalled) + }) + } +} + +var _ topicsClient = &fakeTopicClient{} + +type fakeTopicClient struct { + storage map[string]*aiven.KafkaListTopic // key project/service/topic + v1ListErr error + v2ListErr error + // counters + v1ListCalled int32 + v2ListCalled int32 +} + +func (f *fakeTopicClient) Create(string, string, aiven.CreateKafkaTopicRequest) error { + panic("implement me") +} + +func (f *fakeTopicClient) Update(string, string, string, aiven.UpdateKafkaTopicRequest) error { + panic("implement me") +} + +func (f *fakeTopicClient) Delete(string, string, string) error { + panic("implement me") +} + +func (f *fakeTopicClient) List(project, service string) ([]*aiven.KafkaListTopic, error) { + atomic.AddInt32(&f.v1ListCalled, 1) + key := newKey(project, service) + "/" + result := make([]*aiven.KafkaListTopic, 0) + for k, v := range f.storage { + if strings.HasPrefix(k, key) { + result = append(result, v) + } + } + return result, f.v1ListErr +} + +func (f *fakeTopicClient) V2List(project, service string, topicNames []string) ([]*aiven.KafkaTopic, error) { + atomic.AddInt32(&f.v2ListCalled, 1) + result := make([]*aiven.KafkaTopic, 0) + for _, n := range topicNames { + v, ok := f.storage[newKey(project, service, n)] + if ok { + result = append(result, &aiven.KafkaTopic{TopicName: v.TopicName}) + } + } + return result, f.v2ListErr +} diff --git a/internal/sdkprovider/kafkatopicrepository/update.go b/internal/sdkprovider/kafkatopicrepository/update.go new file mode 100644 index 000000000..ec32c86a8 --- /dev/null +++ b/internal/sdkprovider/kafkatopicrepository/update.go @@ -0,0 +1,11 @@ +package kafkatopicrepository + +import ( + "context" + + "github.com/aiven/aiven-go-client" +) + +func (rep *repo) Update(_ context.Context, project, service, topic string, req aiven.UpdateKafkaTopicRequest) error { + return rep.client.Update(project, service, topic, req) +} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 3698f451c..adbc9dd23 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -3,19 +3,17 @@ package kafkatopic import ( "context" "errors" - "fmt" "log" - "time" "github.com/aiven/aiven-go-client" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" "github.com/aiven/terraform-provider-aiven/internal/schemautil" "github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig" "github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig/stateupgrader" + "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/kafkatopicrepository" ) var aivenKafkaTopicSchema = map[string]*schema.Schema{ @@ -269,7 +267,9 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int // aiven.KafkaTopics.Create() function may return 501 on create // Second call might say that topic already exists // So to be sure, better check it before create - _, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) + client := m.(*aiven.Client) + repo := kafkatopicrepository.NewRepository(client.KafkaTopics) + _, err := repo.Read(ctx, project, serviceName, topicName) // No error means topic exists if err == nil { @@ -289,26 +289,13 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int Tags: getTags(d), } - w := &kafkaTopicCreateWaiter{ - Client: m.(*aiven.Client), - Project: project, - ServiceName: serviceName, - CreateRequest: createRequest, - } - - timeout := d.Timeout(schema.TimeoutCreate) - - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - _, err = w.Conf(timeout).WaitForStateContext(ctx) + err = repo.Create(ctx, project, serviceName, createRequest) if err != nil { return diag.FromErr(err) } d.SetId(schemautil.BuildResourceID(project, serviceName, topicName)) - // Invalidates cache for the topic - DeleteTopicFromCache(project, serviceName, topicName) - // We do not call a Kafka Topic read here to speed up the performance. // However, in the case of Kafka Topic resource getting a computed field // in the future, a read operation should be called after creation. @@ -375,7 +362,8 @@ func resourceKafkaTopicRead(ctx context.Context, d *schema.ResourceData, m inter return diag.FromErr(err) } - topic, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) + client := m.(*aiven.Client).KafkaTopics + topic, err := kafkatopicrepository.NewRepository(client).Read(ctx, project, serviceName, topicName) // Topics are destroyed when kafka is off // https://docs.aiven.io/docs/platform/concepts/service-power-cycle @@ -446,31 +434,7 @@ func flattenKafkaTopicTags(list []aiven.KafkaTopicTag) []map[string]interface{} return tags } -func getTopic(ctx context.Context, m interface{}, timeout time.Duration, project, serviceName, topicName string) (*aiven.KafkaTopic, error) { - client, ok := m.(*aiven.Client) - if !ok { - return nil, fmt.Errorf("invalid Aiven client") - } - - w, err := newKafkaTopicAvailabilityWaiter(client, project, serviceName, topicName) - if err != nil { - return nil, err - } - - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - topic, err := w.Conf(timeout).WaitForStateContext(ctx) - if err != nil { - return nil, err - } - - kt, ok := topic.(aiven.KafkaTopic) - if !ok { - return nil, fmt.Errorf("can't cast value to aiven.KafkaTopic") - } - return &kt, nil -} - -func resourceKafkaTopicUpdate(_ context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { +func resourceKafkaTopicUpdate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { client := m.(*aiven.Client) partitions := d.Get("partitions").(int) @@ -479,7 +443,8 @@ func resourceKafkaTopicUpdate(_ context.Context, d *schema.ResourceData, m inter return diag.FromErr(err) } - err = client.KafkaTopics.Update( + err = kafkatopicrepository.NewRepository(client.KafkaTopics).Update( + ctx, projectName, serviceName, topicName, @@ -509,17 +474,7 @@ func resourceKafkaTopicDelete(ctx context.Context, d *schema.ResourceData, m int return diag.Errorf("cannot delete kafka topic when termination_protection is enabled") } - waiter := TopicDeleteWaiter{ - Client: client, - ProjectName: projectName, - ServiceName: serviceName, - TopicName: topicName, - } - - timeout := d.Timeout(schema.TimeoutDelete) - - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - _, err = waiter.Conf(timeout).WaitForStateContext(ctx) + err = kafkatopicrepository.NewRepository(client.KafkaTopics).Delete(ctx, projectName, serviceName, topicName) if err != nil { return diag.Errorf("error waiting for Aiven Kafka Topic to be DELETED: %s", err) } @@ -556,41 +511,3 @@ func flattenKafkaTopicConfig(t *aiven.KafkaTopic) []map[string]interface{} { }, } } - -// TopicDeleteWaiter is used to wait for Kafka Topic to be deleted. -type TopicDeleteWaiter struct { - Client *aiven.Client - ProjectName string - ServiceName string - TopicName string -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *TopicDeleteWaiter) RefreshFunc() resource.StateRefreshFunc { - return func() (interface{}, string, error) { - err := w.Client.KafkaTopics.Delete(w.ProjectName, w.ServiceName, w.TopicName) - if err != nil { - if !aiven.IsNotFound(err) { - return nil, "REMOVING", nil - } - } - - return aiven.KafkaTopic{}, "DELETED", nil - } -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *TopicDeleteWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Delete waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"REMOVING"}, - Target: []string{"DELETED"}, - Refresh: w.RefreshFunc(), - Delay: 1 * time.Second, - Timeout: timeout, - MinTimeout: 1 * time.Second, - } -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go deleted file mode 100644 index e2465684e..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go +++ /dev/null @@ -1,201 +0,0 @@ -package kafkatopic - -import ( - "log" - "sync" - - "github.com/aiven/aiven-go-client" - "golang.org/x/exp/slices" -) - -var topicCache = newTopicCache() - -// kafkaTopicCache represents Kafka Topics cache based on Service and Project identifiers -type kafkaTopicCache struct { - sync.RWMutex - internal map[string]map[string]aiven.KafkaTopic - inQueue map[string][]string - missing map[string][]string - v1list map[string][]string -} - -// newTopicCache creates new instance of Kafka Topic Cache -func newTopicCache() *kafkaTopicCache { - return &kafkaTopicCache{ - internal: make(map[string]map[string]aiven.KafkaTopic), - inQueue: make(map[string][]string), - missing: make(map[string][]string), - v1list: make(map[string][]string), - } -} - -// getTopicCache gets a global Kafka Topics Cache -func getTopicCache() *kafkaTopicCache { - return topicCache -} - -// LoadByProjectAndServiceName returns a list of Kafka Topics stored in the cache for a given Project -// and Service names, or nil if no value is present. -// The ok result indicates whether value was found in the map. -func (t *kafkaTopicCache) LoadByProjectAndServiceName(projectName, serviceName string) (map[string]aiven.KafkaTopic, bool) { - t.RLock() - result, ok := t.internal[projectName+serviceName] - t.RUnlock() - - return result, ok -} - -// LoadByTopicName returns a list of Kafka Topics stored in the cache for a given Project -// and Service names, or nil if no value is present. -// The ok result indicates whether value was found in the map. -func (t *kafkaTopicCache) LoadByTopicName(projectName, serviceName, topicName string) (aiven.KafkaTopic, bool) { - t.RLock() - defer t.RUnlock() - - topics, ok := t.internal[projectName+serviceName] - if !ok { - return aiven.KafkaTopic{State: "CONFIGURING"}, false - } - - result, ok := topics[topicName] - if !ok { - result.State = "CONFIGURING" - } - - log.Printf("[TRACE] retrieving from a topic cache `%+#v` for a topic name `%s`", result, topicName) - - return result, ok -} - -// DeleteByProjectAndServiceName deletes the cache value for a key which is a combination of Project -// and Service names. -func (t *kafkaTopicCache) DeleteByProjectAndServiceName(projectName, serviceName string) { - t.Lock() - delete(t.internal, projectName+serviceName) - t.Unlock() -} - -// StoreByProjectAndServiceName sets the values for a Project name and Service name key. -func (t *kafkaTopicCache) StoreByProjectAndServiceName(projectName, serviceName string, list []*aiven.KafkaTopic) { - if len(list) == 0 { - return - } - - log.Printf("[DEBUG] Updating Kafka Topic cache for project %s and service %s ...", projectName, serviceName) - - for _, topic := range list { - t.Lock() - if _, ok := t.internal[projectName+serviceName]; !ok { - t.internal[projectName+serviceName] = make(map[string]aiven.KafkaTopic) - } - t.internal[projectName+serviceName][topic.TopicName] = *topic - - // when topic is added to cache, it need to be deleted from the queue - for i, name := range t.inQueue[projectName+serviceName] { - if name == topic.TopicName { - t.inQueue[projectName+serviceName] = append(t.inQueue[projectName+serviceName][:i], t.inQueue[projectName+serviceName][i+1:]...) - } - } - - t.Unlock() - } -} - -// AddToQueue adds a topic name to a queue of topics to be found -func (t *kafkaTopicCache) AddToQueue(projectName, serviceName, topicName string) { - var isFound bool - - t.Lock() - // check if topic is already in the queue - for _, name := range t.inQueue[projectName+serviceName] { - if name == topicName { - isFound = true - } - } - - _, inCache := t.internal[projectName+serviceName][topicName] - // the only topic that is not in the queue nor inside cache can be added to the queue - if !isFound && !inCache { - t.inQueue[projectName+serviceName] = append(t.inQueue[projectName+serviceName], topicName) - } - t.Unlock() -} - -// DeleteFromQueueAndMarkMissing topic from the queue and marks it as missing -func (t *kafkaTopicCache) DeleteFromQueueAndMarkMissing(projectName, serviceName, topicName string) { - t.Lock() - for k, name := range t.inQueue[projectName+serviceName] { - if name == topicName { - t.inQueue[projectName+serviceName] = slices.Delete(t.inQueue[projectName+serviceName], k, k+1) - } - } - - t.missing[projectName+serviceName] = append(t.missing[projectName+serviceName], topicName) - t.Unlock() -} - -// GetMissing retrieves a list of missing topics -func (t *kafkaTopicCache) GetMissing(projectName, serviceName string) []string { - t.RLock() - defer t.RUnlock() - - return t.missing[projectName+serviceName] -} - -// GetQueue retrieves a topics queue, retrieves up to 100 first elements -func (t *kafkaTopicCache) GetQueue(projectName, serviceName string) []string { - t.RLock() - defer t.RUnlock() - - if len(t.inQueue[projectName+serviceName]) >= 100 { - return t.inQueue[projectName+serviceName][:99] - } - - return t.inQueue[projectName+serviceName] -} - -// SetV1List sets v1 topics list -func (t *kafkaTopicCache) SetV1List(projectName, serviceName string, list []*aiven.KafkaListTopic) { - t.Lock() - for _, v := range list { - t.v1list[projectName+serviceName] = append(t.v1list[projectName+serviceName], v.TopicName) - } - t.Unlock() -} - -// GetV1List retrieves a list of V1 kafka topic names -func (t *kafkaTopicCache) GetV1List(projectName, serviceName string) []string { - t.RLock() - defer t.RUnlock() - - return t.v1list[projectName+serviceName] -} - -// DeleteTopicFromCache Invalidates cache for the topic -func DeleteTopicFromCache(projectName, serviceName, topicName string) { - t := getTopicCache() - t.Lock() - key := projectName + serviceName - for k, name := range t.missing[key] { - if name == topicName { - if l := len(t.missing[key]); k+1 > l { - t.missing[key] = t.missing[key][:l-1] - continue - } - t.missing[key] = slices.Delete(t.missing[key], k, k+1) - } - } - for k, name := range t.v1list[key] { - if name == topicName { - if l := len(t.v1list[key]); k+1 > l { - t.v1list[key] = t.v1list[key][:l-1] - continue - } - t.v1list[key] = slices.Delete(t.v1list[key], k, k+1) - } - } - if t.internal[key] != nil { - delete(t.internal[key], topicName) - } - t.Unlock() -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache_test.go deleted file mode 100644 index 822506221..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache_test.go +++ /dev/null @@ -1,192 +0,0 @@ -package kafkatopic - -import ( - "reflect" - "testing" - - "github.com/aiven/aiven-go-client" -) - -func TestTopicCache_LoadByProjectAndServiceName(t1 *testing.T) { - type args struct { - projectName string - serviceName string - } - tests := []struct { - name string - doSomething func(*kafkaTopicCache) - args args - want map[string]aiven.KafkaTopic - want1 bool - }{ - { - "not_found", - func(*kafkaTopicCache) { - }, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - }, - nil, - false, - }, - { - "basic", - testAddTwoTopicsToCache, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - }, - map[string]aiven.KafkaTopic{ - "topic-1": { - Replication: 3, - State: "AVAILABLE", - TopicName: "topic-1", - }, - "topic-2": { - Replication: 1, - State: "AVAILABLE", - TopicName: "topic-2", - }, - }, - true, - }, - } - for _, tt := range tests { - t := newTopicCache() - tt.doSomething(t) - - t1.Run(tt.name, func(t1 *testing.T) { - got, got1 := t.LoadByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - if !reflect.DeepEqual(got, tt.want) { - t1.Errorf("LoadByProjectAndServiceName() got = %v, want %v", got, tt.want) - } - if got1 != tt.want1 { - t1.Errorf("LoadByProjectAndServiceName() got1 = %v, want %v", got1, tt.want1) - } - }) - } -} - -func TestTopicCache_LoadByTopicName(t1 *testing.T) { - type args struct { - projectName string - serviceName string - topicName string - } - tests := []struct { - name string - doSomething func(*kafkaTopicCache) - args args - want aiven.KafkaTopic - want1 bool - }{ - { - "not_found", - func(*kafkaTopicCache) { - - }, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - topicName: "topic-1", - }, - aiven.KafkaTopic{ - State: "CONFIGURING", - }, - false, - }, - { - "basic", - testAddTwoTopicsToCache, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - topicName: "topic-1", - }, - aiven.KafkaTopic{ - Replication: 3, - State: "AVAILABLE", - TopicName: "topic-1", - }, - true, - }, - } - for _, tt := range tests { - t := newTopicCache() - tt.doSomething(t) - - t1.Run(tt.name, func(t1 *testing.T) { - got, got1 := t.LoadByTopicName(tt.args.projectName, tt.args.serviceName, tt.args.topicName) - if !reflect.DeepEqual(got, tt.want) { - t1.Errorf("LoadByTopicName() got = %v, want %v", got, tt.want) - } - if got1 != tt.want1 { - t1.Errorf("LoadByTopicName() got1 = %v, want %v", got1, tt.want1) - } - }) - } -} - -func TestTopicCache_DeleteByProjectAndServiceName(t1 *testing.T) { - type args struct { - projectName string - serviceName string - } - tests := []struct { - name string - doSomething func(*kafkaTopicCache) - args args - }{ - { - "basic", - testAddTwoTopicsToCache, - args{ - projectName: "test-pr1", - serviceName: "test-sr1", - }, - }, - } - for _, tt := range tests { - t := newTopicCache() - tt.doSomething(t) - - t1.Run(tt.name, func(t1 *testing.T) { - got, got1 := t.LoadByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - if len(got) == 0 { - t1.Errorf("LoadByProjectAndServiceName() got = %v", got) - } - if got1 != true { - t1.Errorf("LoadByProjectAndServiceName() got1 = %v", got1) - } - - t.DeleteByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - - got, got1 = t.LoadByProjectAndServiceName(tt.args.projectName, tt.args.serviceName) - if len(got) != 0 { - t1.Errorf("After deletion LoadByProjectAndServiceName() should be empty, got = %v", got) - } - if got1 != false { - t1.Errorf("After deletion LoadByProjectAndServiceName() got1 whould be false = %v", got1) - } - }) - } -} - -func testAddTwoTopicsToCache(c *kafkaTopicCache) { - c.StoreByProjectAndServiceName( - "test-pr1", - "test-sr1", - []*aiven.KafkaTopic{ - { - Replication: 3, - State: "AVAILABLE", - TopicName: "topic-1", - }, - { - Replication: 1, - State: "AVAILABLE", - TopicName: "topic-2", - }, - }) -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go deleted file mode 100644 index 21f21e71b..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go +++ /dev/null @@ -1,65 +0,0 @@ -package kafkatopic - -import ( - "log" - "time" - - "github.com/aiven/aiven-go-client" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" -) - -// kafkaTopicCreateWaiter is used to create topics. Since topics are often -// created right after Kafka service is created there may be temporary issues -// that prevent creating the topics like all brokers not being online. This -// allows retrying the operation until failing it. -type kafkaTopicCreateWaiter struct { - Client *aiven.Client - Project string - ServiceName string - CreateRequest aiven.CreateKafkaTopicRequest -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc { - // Should check if topic does not exist before create - // Assumes it exists, should prove it doesn't by getting no error - return func() (interface{}, string, error) { - err := w.Client.KafkaTopics.Create( - w.Project, - w.ServiceName, - w.CreateRequest, - ) - - if err != nil { - // If some brokers are offline while the request is being executed - // the operation may fail. - aivenError, ok := err.(aiven.Error) - if !ok { - return nil, "", err - } - - if !aiven.IsAlreadyExists(aivenError) { - log.Printf("[DEBUG] Got error %v while waiting for topic to be created.", aivenError) - return nil, "CREATING", nil - } - } - - return w.CreateRequest.TopicName, "CREATED", nil - } -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Create waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"CREATING"}, - Target: []string{"CREATED"}, - Refresh: w.RefreshFunc(), - Delay: 5 * time.Second, - Timeout: timeout, - MinTimeout: 10 * time.Second, - } -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index 4c77f3f41..1772b098b 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -19,7 +19,7 @@ import ( acc "github.com/aiven/terraform-provider-aiven/internal/acctest" "github.com/aiven/terraform-provider-aiven/internal/schemautil" - "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/service/kafkatopic" + "github.com/aiven/terraform-provider-aiven/internal/sdkprovider/kafkatopicrepository" ) func TestAccAivenKafkaTopic_basic(t *testing.T) { @@ -388,8 +388,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) { assert.Nil(t, tc) assert.True(t, aiven.IsNotFound(err)) - // Invalidates cache for the topic - kafkatopic.DeleteTopicFromCache(project, kafkaName, topicName) + // We need to remove it from reps cache + assert.NoError(t, kafkatopicrepository.DeleteTopicCache(project, kafkaName, topicName)) }, // Now plan shows a diff ExpectNonEmptyPlan: true, diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go deleted file mode 100644 index a45575c49..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ /dev/null @@ -1,139 +0,0 @@ -package kafkatopic - -import ( - "fmt" - "log" - "time" - - "github.com/aiven/aiven-go-client" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" - "golang.org/x/exp/slices" - "golang.org/x/sync/semaphore" -) - -// kafkaTopicAvailabilityWaiter is used to refresh the Aiven Kafka Topic endpoints when -// provisioning. -type kafkaTopicAvailabilityWaiter struct { - Client *aiven.Client - Project string - ServiceName string - TopicName string -} - -var kafkaTopicAvailabilitySem = semaphore.NewWeighted(1) - -func newKafkaTopicAvailabilityWaiter(client *aiven.Client, project, serviceName, topicName string) (*kafkaTopicAvailabilityWaiter, error) { - if len(project)*len(serviceName)*len(topicName) == 0 { - return nil, fmt.Errorf("return invalid input: project=%q, serviceName=%q, topicName=%q", project, serviceName, topicName) - } - return &kafkaTopicAvailabilityWaiter{ - Client: client, - Project: project, - ServiceName: serviceName, - TopicName: topicName, - }, nil -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { - return func() (interface{}, string, error) { - cache := getTopicCache() - - // Checking if the topic is in the missing list. If so, trowing 404 error - if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) { - return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("Topic %s is not found", w.TopicName)} - } - - topic, ok := cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) - if !ok { - err := w.refresh() - - if err != nil { - return nil, "CONFIGURING", err - } - - topic, ok = cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) - if !ok { - return nil, "CONFIGURING", nil - } - } - - log.Printf("[DEBUG] Got `%s` state while waiting for topic `%s` to be up.", topic.State, w.TopicName) - - return topic, topic.State, nil - } -} - -func (w *kafkaTopicAvailabilityWaiter) refresh() error { - if !kafkaTopicAvailabilitySem.TryAcquire(1) { - log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") - getTopicCache().AddToQueue(w.Project, w.ServiceName, w.TopicName) - return nil - } - defer kafkaTopicAvailabilitySem.Release(1) - - c := getTopicCache() - - // check if topic is already in cache - if _, ok := c.LoadByTopicName(w.Project, w.ServiceName, w.TopicName); ok { - return nil - } - - c.AddToQueue(w.Project, w.ServiceName, w.TopicName) - - for { - queue := c.GetQueue(w.Project, w.ServiceName) - if len(queue) == 0 { - break - } - - log.Printf("[DEBUG] Kafka Topic queue: %+v", queue) - v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue) - if err != nil { - // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch - // do not exist but does not say which ones are missing. Therefore, we need to - // identify the none existing topics. - if aiven.IsNotFound(err) { - // Caching a list of all topics for a service from v1 GET endpoint. - // Aiven has a request-per-minute limit; therefore, to minimize - // the request count, we query the V1 list endpoint. - if len(c.GetV1List(w.Project, w.ServiceName)) == 0 { - list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) - if err != nil { - return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) - } - c.SetV1List(w.Project, w.ServiceName, list) - } - - // If topic is missing in V1 list then it does not exist, flagging it as missing - for _, t := range queue { - if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { - c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) - } - } - return nil - } - return err - } - - getTopicCache().StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) - } - - return nil -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicAvailabilityWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Kafka Topic availability waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"CONFIGURING"}, - Target: []string{"ACTIVE"}, - Refresh: w.RefreshFunc(), - Timeout: timeout, - PollInterval: 5 * time.Second, - NotFoundChecks: 100, - } -}