From 456dac33351d0452da1aaba7f110ce3e3219dda2 Mon Sep 17 00:00:00 2001 From: Ivan Savciuc Date: Fri, 13 Oct 2023 12:38:56 +0300 Subject: [PATCH] fix(topic): performance for read and creation --- .../service/kafkatopic/kafka_topic.go | 16 ++------ .../service/kafkatopic/kafka_topic_cache.go | 8 ++++ .../service/kafkatopic/kafka_topic_exists.go | 40 +++++++++++++++++++ .../service/kafkatopic/kafka_topic_test.go | 1 + .../service/kafkatopic/kafka_topic_wait.go | 3 +- 5 files changed, 54 insertions(+), 14 deletions(-) create mode 100644 internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 49075d5da..3b8a80240 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -269,18 +269,11 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int // aiven.KafkaTopics.Create() function may return 501 on create // Second call might say that topic already exists // So to be sure, better check it before create - _, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) - - // No error means topic exists - if err == nil { + client := m.(*aiven.Client) + if isTopicExists(ctx, client, project, serviceName, topicName) { return diag.Errorf("Topic conflict, already exists: %s", topicName) } - // If this is not "does not exist", then something happened - if !aiven.IsNotFound(err) { - return diag.FromErr(err) - } - createRequest := aiven.CreateKafkaTopicRequest{ Partitions: &partitions, Replication: &replication, @@ -289,7 +282,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int Tags: getTags(d), } - err = m.(*aiven.Client).KafkaTopics.Create( + err := client.KafkaTopics.Create( ctx, project, serviceName, @@ -301,8 +294,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int d.SetId(schemautil.BuildResourceID(project, serviceName, topicName)) - // Invalidates cache for the topic - DeleteTopicFromCache(project, serviceName, topicName) + getTopicCache().AddToQueue(project, serviceName, topicName) // We do not call a Kafka Topic read here to speed up the performance. // However, in the case of Kafka Topic resource getting a computed field diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go index eed3c903d..d308e41d0 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go @@ -201,3 +201,11 @@ func DeleteTopicFromCache(projectName, serviceName, topicName string) { } t.Unlock() } + +// GetFullQueue retrieves a topics queue +func (t *kafkaTopicCache) GetFullQueue(projectName, serviceName string) []string { + t.RLock() + defer t.RUnlock() + + return t.inQueue[projectName+serviceName] +} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go new file mode 100644 index 000000000..de4574711 --- /dev/null +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go @@ -0,0 +1,40 @@ +package kafkatopic + +import ( + "context" + "log" + "sync" + + "github.com/aiven/aiven-go-client/v2" + "golang.org/x/exp/slices" +) + +var onceForService sync.Map + +// isTopicExists checks if topic exists +func isTopicExists(ctx context.Context, client *aiven.Client, project, serviceName, topic string) bool { + c := getTopicCache() + + var err error + once, _ := onceForService.LoadOrStore(project+"/"+serviceName, new(sync.Once)) + once.(*sync.Once).Do(func() { + var list []*aiven.KafkaListTopic + list, err = client.KafkaTopics.List(ctx, project, serviceName) + if err != nil { + return + } + + c.SetV1List(project, serviceName, list) + }) + + if err != nil { + log.Printf("[ERROR] cannot check kafka topic existence: %s", err) + return false + } + + if slices.Contains(c.GetV1List(project, serviceName), topic) || slices.Contains(c.GetFullQueue(project, serviceName), topic) { + return true + } + + return false +} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index d0813db82..8914975b5 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -400,6 +400,7 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) { RefreshState: true, }, { + ExpectNonEmptyPlan: true, // Step 3: recreates the topic Config: config, Check: resource.ComposeTestCheckFunc( diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go index e93357936..7ce0981e5 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go @@ -74,6 +74,7 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { func (w *kafkaTopicAvailabilityWaiter) refresh() error { c := getTopicCache() + c.AddToQueue(w.Project, w.ServiceName, w.TopicName) if !kafkaTopicAvailabilitySem.TryAcquire(1) { log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") @@ -86,8 +87,6 @@ func (w *kafkaTopicAvailabilityWaiter) refresh() error { return nil } - c.AddToQueue(w.Project, w.ServiceName, w.TopicName) - queue := c.GetQueue(w.Project, w.ServiceName) if len(queue) == 0 { return nil