diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 49075d5da..99ebe4036 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -265,22 +265,15 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int topicName := d.Get("topic_name").(string) partitions := d.Get("partitions").(int) replication := d.Get("replication").(int) + client := m.(*aiven.Client) // aiven.KafkaTopics.Create() function may return 501 on create // Second call might say that topic already exists // So to be sure, better check it before create - _, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) - - // No error means topic exists - if err == nil { + if isTopicExists(ctx, client, project, serviceName, topicName) { return diag.Errorf("Topic conflict, already exists: %s", topicName) } - // If this is not "does not exist", then something happened - if !aiven.IsNotFound(err) { - return diag.FromErr(err) - } - createRequest := aiven.CreateKafkaTopicRequest{ Partitions: &partitions, Replication: &replication, @@ -289,7 +282,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int Tags: getTags(d), } - err = m.(*aiven.Client).KafkaTopics.Create( + err := client.KafkaTopics.Create( ctx, project, serviceName, @@ -300,9 +293,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int } d.SetId(schemautil.BuildResourceID(project, serviceName, topicName)) - - // Invalidates cache for the topic - DeleteTopicFromCache(project, serviceName, topicName) + getTopicCache().AddToQueue(project, serviceName, topicName) // We do not call a Kafka Topic read here to speed up the performance. // However, in the case of Kafka Topic resource getting a computed field diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go index 932759eb0..29dbec7b5 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go @@ -142,8 +142,8 @@ func (t *kafkaTopicCache) GetMissing(projectName, serviceName string) []string { return t.missing[projectName+serviceName] } -// GetQueue retrieves a topics queue, retrieves up to 100 first elements -func (t *kafkaTopicCache) GetQueue(projectName, serviceName string) []string { +// GetQueueTop100 retrieves a topics queue, retrieves up to 100 first elements +func (t *kafkaTopicCache) GetQueueTop100(projectName, serviceName string) []string { t.RLock() defer t.RUnlock() @@ -203,3 +203,11 @@ func DeleteTopicFromCache(projectName, serviceName, topicName string) { } t.Unlock() } + +// GetFullQueue retrieves a topics queue +func (t *kafkaTopicCache) GetFullQueue(projectName, serviceName string) []string { + t.RLock() + defer t.RUnlock() + + return t.inQueue[projectName+serviceName] +} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go new file mode 100644 index 000000000..afb5b5703 --- /dev/null +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go @@ -0,0 +1,42 @@ +package kafkatopic + +import ( + "context" + "log" + "sync" + + "github.com/aiven/aiven-go-client/v2" + "golang.org/x/exp/slices" +) + +var onceCheckTopicForService sync.Map + +// isTopicExists checks if topic exists +func isTopicExists(ctx context.Context, client *aiven.Client, project, serviceName, topic string) bool { + c := getTopicCache() + + var err error + + // Warming up of the v1 cache should happen only once per service + once, _ := onceCheckTopicForService.LoadOrStore(project+"/"+serviceName, new(sync.Once)) + once.(*sync.Once).Do(func() { + var list []*aiven.KafkaListTopic + list, err = client.KafkaTopics.List(ctx, project, serviceName) + if err != nil { + return + } + + c.SetV1List(project, serviceName, list) + }) + + if err != nil { + log.Printf("[ERROR] cannot check kafka topic existence: %s", err) + return false + } + + if slices.Contains(c.GetV1List(project, serviceName), topic) { + return true + } + + return slices.Contains(c.GetFullQueue(project, serviceName), topic) +} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index d0813db82..3f20bfa4e 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -401,7 +401,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) { }, { // Step 3: recreates the topic - Config: config, + ExpectNonEmptyPlan: true, + Config: config, Check: resource.ComposeTestCheckFunc( // Saved in state resource.TestCheckResourceAttr(kafkaResource, "id", kafkaID), diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go index e93357936..143c953fe 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go @@ -74,6 +74,7 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { func (w *kafkaTopicAvailabilityWaiter) refresh() error { c := getTopicCache() + c.AddToQueue(w.Project, w.ServiceName, w.TopicName) if !kafkaTopicAvailabilitySem.TryAcquire(1) { log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") @@ -86,9 +87,7 @@ func (w *kafkaTopicAvailabilityWaiter) refresh() error { return nil } - c.AddToQueue(w.Project, w.ServiceName, w.TopicName) - - queue := c.GetQueue(w.Project, w.ServiceName) + queue := c.GetQueueTop100(w.Project, w.ServiceName) if len(queue) == 0 { return nil }