diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index 4c77f3f41..4c79c7578 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -397,7 +397,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) { }, { // Step 3: recreates the topic - Config: config, + Config: config, + ExpectNonEmptyPlan: true, Check: resource.ComposeTestCheckFunc( // Saved in state resource.TestCheckResourceAttr(kafkaResource, "id", kafkaID), @@ -529,7 +530,7 @@ func TestAccAivenKafkaTopic_conflicts_if_exists(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccAivenKafkaTopicConflictsIfExists(prefix, project), - ExpectError: regexp.MustCompile(`Topic conflict, already exists`), + ExpectError: regexp.MustCompile(`Topic conflict already exists`), }, }, }) diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go index a45575c49..69cf055c1 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go @@ -42,14 +42,12 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { // Checking if the topic is in the missing list. If so, trowing 404 error if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) { - return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("Topic %s is not found", w.TopicName)} + return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("topic `%s` is not found", w.TopicName)} } topic, ok := cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) if !ok { - err := w.refresh() - - if err != nil { + if err := w.refresh(); err != nil { return nil, "CONFIGURING", err } @@ -66,60 +64,54 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { } func (w *kafkaTopicAvailabilityWaiter) refresh() error { + c := getTopicCache() + c.AddToQueue(w.Project, w.ServiceName, w.TopicName) + if !kafkaTopicAvailabilitySem.TryAcquire(1) { log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") - getTopicCache().AddToQueue(w.Project, w.ServiceName, w.TopicName) return nil } defer kafkaTopicAvailabilitySem.Release(1) - c := getTopicCache() - // check if topic is already in cache if _, ok := c.LoadByTopicName(w.Project, w.ServiceName, w.TopicName); ok { return nil } - c.AddToQueue(w.Project, w.ServiceName, w.TopicName) + queue := c.GetQueue(w.Project, w.ServiceName) + if len(queue) == 0 { + return nil + } - for { - queue := c.GetQueue(w.Project, w.ServiceName) - if len(queue) == 0 { - break - } + log.Printf("[DEBUG] kakfa topic queue : %+v", queue) + v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue) + if err != nil { + // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch + // do not exist but does not say which ones are missing. Therefore, we need to + // identify the none existing topics. + if aiven.IsNotFound(err) { + log.Printf("[DEBUG] v2 list 404 error, queue : %+v, error: %s", queue, err) - log.Printf("[DEBUG] Kafka Topic queue: %+v", queue) - v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue) - if err != nil { - // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch - // do not exist but does not say which ones are missing. Therefore, we need to - // identify the none existing topics. - if aiven.IsNotFound(err) { - // Caching a list of all topics for a service from v1 GET endpoint. - // Aiven has a request-per-minute limit; therefore, to minimize - // the request count, we query the V1 list endpoint. - if len(c.GetV1List(w.Project, w.ServiceName)) == 0 { - list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) - if err != nil { - return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) - } - c.SetV1List(w.Project, w.ServiceName, list) - } + list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) + if err != nil { + return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) + } + log.Printf("[DEBUG] v1 list results : %+v", list) + c.SetV1List(w.Project, w.ServiceName, list) - // If topic is missing in V1 list then it does not exist, flagging it as missing - for _, t := range queue { - if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { - c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) - } + // If topic is missing in V1 list then it does not exist, flagging it as missing + for _, t := range queue { + if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { + c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) } - return nil } - return err + return nil } - - getTopicCache().StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) + return err } + c.StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) + return nil }