From 31353e1adadd15c94b88ad43ae2c859f9436b2c2 Mon Sep 17 00:00:00 2001 From: Ivan Savciuc Date: Tue, 5 Sep 2023 10:59:48 +0300 Subject: [PATCH] fix(kafka_topic): improve performance (#1337) --- CHANGELOG.md | 1 + .../service/kafkatopic/kafka_topic_wait.go | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 79f4bc0f0..f92cc810d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ nav_order: 1 - Fix `aiven_transit_gateway_vpc_attachment` resource update - Fix service IP filters normalization - Fix improper omitting in `ToAPI` +- Fix Kafka Topic perfomance ## [4.8.1] - 2023-08-23 diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go index c689665de..a45575c49 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go @@ -40,17 +40,6 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { return func() (interface{}, string, error) { cache := getTopicCache() - // Caching a list of all topics for a service from v1 GET endpoint. - // Aiven has a request-per-minute limit; therefore, to minimize - // the request count, we query the V1 list endpoint. - if len(cache.GetV1List(w.Project, w.ServiceName)) == 0 { - list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) - if err != nil { - return nil, "CONFIGURING", fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) - } - cache.SetV1List(w.Project, w.ServiceName, list) - } - // Checking if the topic is in the missing list. If so, trowing 404 error if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) { return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("Topic %s is not found", w.TopicName)} @@ -79,6 +68,7 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { func (w *kafkaTopicAvailabilityWaiter) refresh() error { if !kafkaTopicAvailabilitySem.TryAcquire(1) { log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") + getTopicCache().AddToQueue(w.Project, w.ServiceName, w.TopicName) return nil } defer kafkaTopicAvailabilitySem.Release(1) @@ -105,6 +95,17 @@ func (w *kafkaTopicAvailabilityWaiter) refresh() error { // do not exist but does not say which ones are missing. Therefore, we need to // identify the none existing topics. if aiven.IsNotFound(err) { + // Caching a list of all topics for a service from v1 GET endpoint. + // Aiven has a request-per-minute limit; therefore, to minimize + // the request count, we query the V1 list endpoint. + if len(c.GetV1List(w.Project, w.ServiceName)) == 0 { + list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) + if err != nil { + return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) + } + c.SetV1List(w.Project, w.ServiceName, list) + } + // If topic is missing in V1 list then it does not exist, flagging it as missing for _, t := range queue { if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) {