Skip to content

Commit

Permalink
feat(kafkatopic): creation simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-savciuc authored and Serpentiel committed Sep 18, 2023
1 parent 7b54651 commit daa9322
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 95 deletions.
26 changes: 12 additions & 14 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"log"
"os"
"time"

"github.com/aiven/aiven-go-client"
Expand Down Expand Up @@ -289,26 +290,23 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
Tags: getTags(d),
}

w := &kafkaTopicCreateWaiter{
Client: m.(*aiven.Client),
Project: project,
ServiceName: serviceName,
CreateRequest: createRequest,
err = m.(*aiven.Client).KafkaTopics.Create(
project,
serviceName,
createRequest,
)
if err != nil && !aiven.IsAlreadyExists(err) {
diag.FromErr(err)
}

timeout := d.Timeout(schema.TimeoutCreate)

// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext.
_, err = w.Conf(timeout).WaitForStateContext(ctx)
if err != nil {
return diag.FromErr(err)
// Invalidates cache for the topic
// We need this only for acceptance tests
if os.Getenv("TF_ACC") == "1" {
DeleteTopicFromCache(project, serviceName, topicName)
}

d.SetId(schemautil.BuildResourceID(project, serviceName, topicName))

// Invalidates cache for the topic
DeleteTopicFromCache(project, serviceName, topicName)

// We do not call a Kafka Topic read here to speed up the performance.
// However, in the case of Kafka Topic resource getting a computed field
// in the future, a read operation should be called after creation.
Expand Down
30 changes: 14 additions & 16 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,27 +172,25 @@ func (t *kafkaTopicCache) GetV1List(projectName, serviceName string) []string {
}

// DeleteTopicFromCache Invalidates cache for the topic
// This function only exists to pass acceptance tests. Cache invalidation
// happens automatically in Terraform when used in the real-life world between
// each subsequent operation. However, during the acceptance test execution,
// we need to mimic the cache invalidation mechanism by calling this function.
func DeleteTopicFromCache(projectName, serviceName, topicName string) {
t := getTopicCache()
t.Lock()
key := projectName + serviceName
for k, name := range t.missing[key] {
if name == topicName {
if l := len(t.missing[key]); k+1 > l {
t.missing[key] = t.missing[key][:l-1]
continue
}
t.missing[key] = slices.Delete(t.missing[key], k, k+1)
}
// Queue and other cache layers should be entirely invalidated because slice capacity
// remains the same after removing one element using slice.Delete, and further down
// the road, we use slice[0:99] and other cases that may restore previously deleted elements.
if t.inQueue[key] != nil {
t.inQueue[key] = make([]string, 0)
}
for k, name := range t.v1list[key] {
if name == topicName {
if l := len(t.v1list[key]); k+1 > l {
t.v1list[key] = t.v1list[key][:l-1]
continue
}
t.v1list[key] = slices.Delete(t.v1list[key], k, k+1)
}
if t.missing[key] != nil {
t.missing[key] = make([]string, 0)
}
if t.v1list != nil {
t.v1list[key] = make([]string, 0)
}
if t.internal[key] != nil {
delete(t.internal[key], topicName)
Expand Down
65 changes: 0 additions & 65 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_create.go

This file was deleted.

0 comments on commit daa9322

Please sign in to comment.