diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 3698f451c..3558c547b 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log" + "sync" "time" "github.com/aiven/aiven-go-client" @@ -12,6 +13,8 @@ import ( "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/validation" "github.com/hashicorp/terraform-plugin-testing/helper/resource" + "golang.org/x/exp/slices" + "golang.org/x/sync/semaphore" "github.com/aiven/terraform-provider-aiven/internal/schemautil" "github.com/aiven/terraform-provider-aiven/internal/schemautil/userconfig" @@ -259,27 +262,18 @@ func ResourceKafkaTopic() *schema.Resource { } } +var kafkaTopicCreateSem = semaphore.NewWeighted(1) +var onceCreate sync.Once + func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m interface{}) diag.Diagnostics { project := d.Get("project").(string) serviceName := d.Get("service_name").(string) topicName := d.Get("topic_name").(string) partitions := d.Get("partitions").(int) replication := d.Get("replication").(int) - - // aiven.KafkaTopics.Create() function may return 501 on create - // Second call might say that topic already exists - // So to be sure, better check it before create - _, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) - - // No error means topic exists - if err == nil { - return diag.Errorf("Topic conflict, already exists: %s", topicName) - } - - // If this is not "does not exist", then something happened - if !aiven.IsNotFound(err) { - return diag.FromErr(err) - } + c := getTopicCache() + client := m.(*aiven.Client) + var err error createRequest := aiven.CreateKafkaTopicRequest{ Partitions: &partitions, @@ -289,26 +283,39 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int Tags: getTags(d), } - w := &kafkaTopicCreateWaiter{ - Client: m.(*aiven.Client), - Project: project, - ServiceName: serviceName, - CreateRequest: createRequest, + onceCreate.Do(func() { + var list []*aiven.KafkaListTopic + list, err = client.KafkaTopics.List(project, serviceName) + if err != nil { + return + } + + c.SetV1List(project, serviceName, list) + }) + if err != nil { + diag.Errorf("cannot check kafka topic existence: %s", err) } - timeout := d.Timeout(schema.TimeoutCreate) + // Checking if the Kafka topic was already in the queue and, if not, setting it atomically + _ = kafkaTopicCreateSem.Acquire(ctx, 1) + if slices.Contains(c.GetV1List(project, serviceName), topicName) || + slices.Contains(c.GetFullQueue(project, serviceName), topicName) { + return diag.Errorf("Topic conflict already exists: %s", topicName) + } + c.AddToQueue(project, serviceName, topicName) + kafkaTopicCreateSem.Release(1) - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - _, err = w.Conf(timeout).WaitForStateContext(ctx) - if err != nil { - return diag.FromErr(err) + err = client.KafkaTopics.Create( + project, + serviceName, + createRequest, + ) + if err != nil && !aiven.IsAlreadyExists(err) { + diag.FromErr(err) } d.SetId(schemautil.BuildResourceID(project, serviceName, topicName)) - // Invalidates cache for the topic - DeleteTopicFromCache(project, serviceName, topicName) - // We do not call a Kafka Topic read here to speed up the performance. // However, in the case of Kafka Topic resource getting a computed field // in the future, a read operation should be called after creation. diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go index e2465684e..f8b30635c 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go @@ -142,6 +142,14 @@ func (t *kafkaTopicCache) GetMissing(projectName, serviceName string) []string { return t.missing[projectName+serviceName] } +// GetFullQueue retrieves a topics queue +func (t *kafkaTopicCache) GetFullQueue(projectName, serviceName string) []string { + t.RLock() + defer t.RUnlock() + + return t.inQueue[projectName+serviceName] +} + // GetQueue retrieves a topics queue, retrieves up to 100 first elements func (t *kafkaTopicCache) GetQueue(projectName, serviceName string) []string { t.RLock() @@ -157,6 +165,7 @@ func (t *kafkaTopicCache) GetQueue(projectName, serviceName string) []string { // SetV1List sets v1 topics list func (t *kafkaTopicCache) SetV1List(projectName, serviceName string, list []*aiven.KafkaListTopic) { t.Lock() + t.v1list[projectName+serviceName] = nil for _, v := range list { t.v1list[projectName+serviceName] = append(t.v1list[projectName+serviceName], v.TopicName) } diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go deleted file mode 100644 index 21f21e71b..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go +++ /dev/null @@ -1,65 +0,0 @@ -package kafkatopic - -import ( - "log" - "time" - - "github.com/aiven/aiven-go-client" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" -) - -// kafkaTopicCreateWaiter is used to create topics. Since topics are often -// created right after Kafka service is created there may be temporary issues -// that prevent creating the topics like all brokers not being online. This -// allows retrying the operation until failing it. -type kafkaTopicCreateWaiter struct { - Client *aiven.Client - Project string - ServiceName string - CreateRequest aiven.CreateKafkaTopicRequest -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc { - // Should check if topic does not exist before create - // Assumes it exists, should prove it doesn't by getting no error - return func() (interface{}, string, error) { - err := w.Client.KafkaTopics.Create( - w.Project, - w.ServiceName, - w.CreateRequest, - ) - - if err != nil { - // If some brokers are offline while the request is being executed - // the operation may fail. - aivenError, ok := err.(aiven.Error) - if !ok { - return nil, "", err - } - - if !aiven.IsAlreadyExists(aivenError) { - log.Printf("[DEBUG] Got error %v while waiting for topic to be created.", aivenError) - return nil, "CREATING", nil - } - } - - return w.CreateRequest.TopicName, "CREATED", nil - } -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Create waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"CREATING"}, - Target: []string{"CREATED"}, - Refresh: w.RefreshFunc(), - Delay: 5 * time.Second, - Timeout: timeout, - MinTimeout: 10 * time.Second, - } -} diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index 4c77f3f41..4c79c7578 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -397,7 +397,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) { }, { // Step 3: recreates the topic - Config: config, + Config: config, + ExpectNonEmptyPlan: true, Check: resource.ComposeTestCheckFunc( // Saved in state resource.TestCheckResourceAttr(kafkaResource, "id", kafkaID), @@ -529,7 +530,7 @@ func TestAccAivenKafkaTopic_conflicts_if_exists(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccAivenKafkaTopicConflictsIfExists(prefix, project), - ExpectError: regexp.MustCompile(`Topic conflict, already exists`), + ExpectError: regexp.MustCompile(`Topic conflict already exists`), }, }, }) diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go index a45575c49..04e142a43 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go @@ -42,21 +42,12 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { // Checking if the topic is in the missing list. If so, trowing 404 error if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) { - return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("Topic %s is not found", w.TopicName)} + return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("topic `%s` is not found", w.TopicName)} } topic, ok := cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) if !ok { - err := w.refresh() - - if err != nil { - return nil, "CONFIGURING", err - } - - topic, ok = cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) - if !ok { - return nil, "CONFIGURING", nil - } + return nil, "CONFIGURING", w.refresh() } log.Printf("[DEBUG] Got `%s` state while waiting for topic `%s` to be up.", topic.State, w.TopicName) @@ -66,60 +57,54 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { } func (w *kafkaTopicAvailabilityWaiter) refresh() error { + c := getTopicCache() + c.AddToQueue(w.Project, w.ServiceName, w.TopicName) + if !kafkaTopicAvailabilitySem.TryAcquire(1) { log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") - getTopicCache().AddToQueue(w.Project, w.ServiceName, w.TopicName) return nil } defer kafkaTopicAvailabilitySem.Release(1) - c := getTopicCache() - // check if topic is already in cache if _, ok := c.LoadByTopicName(w.Project, w.ServiceName, w.TopicName); ok { return nil } - c.AddToQueue(w.Project, w.ServiceName, w.TopicName) + queue := c.GetQueue(w.Project, w.ServiceName) + if len(queue) == 0 { + return nil + } - for { - queue := c.GetQueue(w.Project, w.ServiceName) - if len(queue) == 0 { - break - } + log.Printf("[DEBUG] kakfa topic queue : %+v", queue) + v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue) + if err != nil { + // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch + // do not exist but does not say which ones are missing. Therefore, we need to + // identify the none existing topics. + if aiven.IsNotFound(err) { + log.Printf("[DEBUG] v2 list 404 error, queue : %+v, error: %s", queue, err) - log.Printf("[DEBUG] Kafka Topic queue: %+v", queue) - v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue) - if err != nil { - // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch - // do not exist but does not say which ones are missing. Therefore, we need to - // identify the none existing topics. - if aiven.IsNotFound(err) { - // Caching a list of all topics for a service from v1 GET endpoint. - // Aiven has a request-per-minute limit; therefore, to minimize - // the request count, we query the V1 list endpoint. - if len(c.GetV1List(w.Project, w.ServiceName)) == 0 { - list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) - if err != nil { - return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) - } - c.SetV1List(w.Project, w.ServiceName, list) - } + list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) + if err != nil { + return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) + } + log.Printf("[DEBUG] v1 list results : %+v", list) + c.SetV1List(w.Project, w.ServiceName, list) - // If topic is missing in V1 list then it does not exist, flagging it as missing - for _, t := range queue { - if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { - c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) - } + // If topic is missing in V1 list then it does not exist, flagging it as missing + for _, t := range queue { + if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { + c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) } - return nil } - return err + return nil } - - getTopicCache().StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) + return err } + c.StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) + return nil }