diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 3698f451c..6e70301eb 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -266,21 +266,6 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int partitions := d.Get("partitions").(int) replication := d.Get("replication").(int) - // aiven.KafkaTopics.Create() function may return 501 on create - // Second call might say that topic already exists - // So to be sure, better check it before create - _, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName) - - // No error means topic exists - if err == nil { - return diag.Errorf("Topic conflict, already exists: %s", topicName) - } - - // If this is not "does not exist", then something happened - if !aiven.IsNotFound(err) { - return diag.FromErr(err) - } - createRequest := aiven.CreateKafkaTopicRequest{ Partitions: &partitions, Replication: &replication, @@ -299,16 +284,13 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int timeout := d.Timeout(schema.TimeoutCreate) // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - _, err = w.Conf(timeout).WaitForStateContext(ctx) + _, err := w.Conf(timeout).WaitForStateContext(ctx) if err != nil { - return diag.FromErr(err) + return diag.Errorf("error creating topic: %s", err) } d.SetId(schemautil.BuildResourceID(project, serviceName, topicName)) - // Invalidates cache for the topic - DeleteTopicFromCache(project, serviceName, topicName) - // We do not call a Kafka Topic read here to speed up the performance. // However, in the case of Kafka Topic resource getting a computed field // in the future, a read operation should be called after creation. diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go index 21f21e71b..3c486c8bb 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go @@ -1,6 +1,7 @@ package kafkatopic import ( + "fmt" "log" "time" @@ -17,6 +18,7 @@ type kafkaTopicCreateWaiter struct { Project string ServiceName string CreateRequest aiven.CreateKafkaTopicRequest + c int } // RefreshFunc will call the Aiven client and refresh it's state. @@ -25,6 +27,18 @@ func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc { // Should check if topic does not exist before create // Assumes it exists, should prove it doesn't by getting no error return func() (interface{}, string, error) { + defer func() { w.c++ }() + + if w.c == 0 { + topic, err := w.Client.KafkaTopics.Get(w.Project, w.ServiceName, w.CreateRequest.TopicName) + if err != nil && !aiven.IsNotFound(err) { + return nil, "", fmt.Errorf("error checking topic creation preconditions: %w", err) + } + if topic != nil { + return nil, "", fmt.Errorf("Topic conflict already exists: %s", w.CreateRequest.TopicName) + } + } + err := w.Client.KafkaTopics.Create( w.Project, w.ServiceName, @@ -36,7 +50,13 @@ func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc { // the operation may fail. aivenError, ok := err.(aiven.Error) if !ok { - return nil, "", err + return nil, "", fmt.Errorf("cannot create kafka topic: %w", err) + } + + // On the first try to create a topic, we should receive the + // `Already exists` error, therefore reporting it. + if w.c == 0 && aiven.IsAlreadyExists(aivenError) { + return nil, "", aivenError } if !aiven.IsAlreadyExists(aivenError) { diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go index 4c77f3f41..4c79c7578 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_test.go @@ -397,7 +397,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) { }, { // Step 3: recreates the topic - Config: config, + Config: config, + ExpectNonEmptyPlan: true, Check: resource.ComposeTestCheckFunc( // Saved in state resource.TestCheckResourceAttr(kafkaResource, "id", kafkaID), @@ -529,7 +530,7 @@ func TestAccAivenKafkaTopic_conflicts_if_exists(t *testing.T) { Steps: []resource.TestStep{ { Config: testAccAivenKafkaTopicConflictsIfExists(prefix, project), - ExpectError: regexp.MustCompile(`Topic conflict, already exists`), + ExpectError: regexp.MustCompile(`Topic conflict already exists`), }, }, }) diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go index a45575c49..04e142a43 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go @@ -42,21 +42,12 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { // Checking if the topic is in the missing list. If so, trowing 404 error if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) { - return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("Topic %s is not found", w.TopicName)} + return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("topic `%s` is not found", w.TopicName)} } topic, ok := cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) if !ok { - err := w.refresh() - - if err != nil { - return nil, "CONFIGURING", err - } - - topic, ok = cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName) - if !ok { - return nil, "CONFIGURING", nil - } + return nil, "CONFIGURING", w.refresh() } log.Printf("[DEBUG] Got `%s` state while waiting for topic `%s` to be up.", topic.State, w.TopicName) @@ -66,60 +57,54 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc { } func (w *kafkaTopicAvailabilityWaiter) refresh() error { + c := getTopicCache() + c.AddToQueue(w.Project, w.ServiceName, w.TopicName) + if !kafkaTopicAvailabilitySem.TryAcquire(1) { log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...") - getTopicCache().AddToQueue(w.Project, w.ServiceName, w.TopicName) return nil } defer kafkaTopicAvailabilitySem.Release(1) - c := getTopicCache() - // check if topic is already in cache if _, ok := c.LoadByTopicName(w.Project, w.ServiceName, w.TopicName); ok { return nil } - c.AddToQueue(w.Project, w.ServiceName, w.TopicName) + queue := c.GetQueue(w.Project, w.ServiceName) + if len(queue) == 0 { + return nil + } - for { - queue := c.GetQueue(w.Project, w.ServiceName) - if len(queue) == 0 { - break - } + log.Printf("[DEBUG] kakfa topic queue : %+v", queue) + v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue) + if err != nil { + // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch + // do not exist but does not say which ones are missing. Therefore, we need to + // identify the none existing topics. + if aiven.IsNotFound(err) { + log.Printf("[DEBUG] v2 list 404 error, queue : %+v, error: %s", queue, err) - log.Printf("[DEBUG] Kafka Topic queue: %+v", queue) - v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue) - if err != nil { - // V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch - // do not exist but does not say which ones are missing. Therefore, we need to - // identify the none existing topics. - if aiven.IsNotFound(err) { - // Caching a list of all topics for a service from v1 GET endpoint. - // Aiven has a request-per-minute limit; therefore, to minimize - // the request count, we query the V1 list endpoint. - if len(c.GetV1List(w.Project, w.ServiceName)) == 0 { - list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) - if err != nil { - return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) - } - c.SetV1List(w.Project, w.ServiceName, list) - } + list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName) + if err != nil { + return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err) + } + log.Printf("[DEBUG] v1 list results : %+v", list) + c.SetV1List(w.Project, w.ServiceName, list) - // If topic is missing in V1 list then it does not exist, flagging it as missing - for _, t := range queue { - if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { - c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) - } + // If topic is missing in V1 list then it does not exist, flagging it as missing + for _, t := range queue { + if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) { + c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t) } - return nil } - return err + return nil } - - getTopicCache().StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) + return err } + c.StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics) + return nil }