diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic.go b/internal/sdkprovider/service/kafkatopic/kafka_topic.go index 3698f451c..2d8e3b413 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log" + "os" "time" "github.com/aiven/aiven-go-client" @@ -289,26 +290,23 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int Tags: getTags(d), } - w := &kafkaTopicCreateWaiter{ - Client: m.(*aiven.Client), - Project: project, - ServiceName: serviceName, - CreateRequest: createRequest, + err = m.(*aiven.Client).KafkaTopics.Create( + project, + serviceName, + createRequest, + ) + if err != nil && !aiven.IsAlreadyExists(err) { + diag.FromErr(err) } - timeout := d.Timeout(schema.TimeoutCreate) - - // nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext. - _, err = w.Conf(timeout).WaitForStateContext(ctx) - if err != nil { - return diag.FromErr(err) + // Invalidates cache for the topic + // We need this only for acceptance tests + if os.Getenv("TF_ACC") == "1" { + DeleteTopicFromCache(project, serviceName, topicName) } d.SetId(schemautil.BuildResourceID(project, serviceName, topicName)) - // Invalidates cache for the topic - DeleteTopicFromCache(project, serviceName, topicName) - // We do not call a Kafka Topic read here to speed up the performance. // However, in the case of Kafka Topic resource getting a computed field // in the future, a read operation should be called after creation. diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go index e2465684e..43cee833b 100644 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go +++ b/internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go @@ -172,27 +172,25 @@ func (t *kafkaTopicCache) GetV1List(projectName, serviceName string) []string { } // DeleteTopicFromCache Invalidates cache for the topic +// This function only exists to pass acceptance tests. Cache invalidation +// happens automatically in Terraform when used in the real-life world between +// each subsequent operation. However, during the acceptance test execution, +// we need to mimic the cache invalidation mechanism by calling this function. func DeleteTopicFromCache(projectName, serviceName, topicName string) { t := getTopicCache() t.Lock() key := projectName + serviceName - for k, name := range t.missing[key] { - if name == topicName { - if l := len(t.missing[key]); k+1 > l { - t.missing[key] = t.missing[key][:l-1] - continue - } - t.missing[key] = slices.Delete(t.missing[key], k, k+1) - } + // Queue and other cache layers should be entirely invalidated because slice capacity + // remains the same after removing one element using slice.Delete, and further down + // the road, we use slice[0:99] and other cases that may restore previously deleted elements. + if t.inQueue[key] != nil { + t.inQueue[key] = make([]string, 0) } - for k, name := range t.v1list[key] { - if name == topicName { - if l := len(t.v1list[key]); k+1 > l { - t.v1list[key] = t.v1list[key][:l-1] - continue - } - t.v1list[key] = slices.Delete(t.v1list[key], k, k+1) - } + if t.missing[key] != nil { + t.missing[key] = make([]string, 0) + } + if t.v1list != nil { + t.v1list[key] = make([]string, 0) } if t.internal[key] != nil { delete(t.internal[key], topicName) diff --git a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go b/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go deleted file mode 100644 index 21f21e71b..000000000 --- a/internal/sdkprovider/service/kafkatopic/kafka_topic_create.go +++ /dev/null @@ -1,65 +0,0 @@ -package kafkatopic - -import ( - "log" - "time" - - "github.com/aiven/aiven-go-client" - "github.com/hashicorp/terraform-plugin-testing/helper/resource" -) - -// kafkaTopicCreateWaiter is used to create topics. Since topics are often -// created right after Kafka service is created there may be temporary issues -// that prevent creating the topics like all brokers not being online. This -// allows retrying the operation until failing it. -type kafkaTopicCreateWaiter struct { - Client *aiven.Client - Project string - ServiceName string - CreateRequest aiven.CreateKafkaTopicRequest -} - -// RefreshFunc will call the Aiven client and refresh it's state. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc { - // Should check if topic does not exist before create - // Assumes it exists, should prove it doesn't by getting no error - return func() (interface{}, string, error) { - err := w.Client.KafkaTopics.Create( - w.Project, - w.ServiceName, - w.CreateRequest, - ) - - if err != nil { - // If some brokers are offline while the request is being executed - // the operation may fail. - aivenError, ok := err.(aiven.Error) - if !ok { - return nil, "", err - } - - if !aiven.IsAlreadyExists(aivenError) { - log.Printf("[DEBUG] Got error %v while waiting for topic to be created.", aivenError) - return nil, "CREATING", nil - } - } - - return w.CreateRequest.TopicName, "CREATED", nil - } -} - -// Conf sets up the configuration to refresh. -// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated resource.StateRefreshFunc. -func (w *kafkaTopicCreateWaiter) Conf(timeout time.Duration) *resource.StateChangeConf { - log.Printf("[DEBUG] Create waiter timeout %.0f minutes", timeout.Minutes()) - - return &resource.StateChangeConf{ - Pending: []string{"CREATING"}, - Target: []string{"CREATED"}, - Refresh: w.RefreshFunc(), - Delay: 5 * time.Second, - Timeout: timeout, - MinTimeout: 10 * time.Second, - } -}