Skip to content

Commit

Permalink
fix(topic): performance for read and creation
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-savciuc committed Oct 24, 2023
1 parent dffd94c commit 0448497
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 19 deletions.
17 changes: 4 additions & 13 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,22 +265,15 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
topicName := d.Get("topic_name").(string)
partitions := d.Get("partitions").(int)
replication := d.Get("replication").(int)
client := m.(*aiven.Client)

// aiven.KafkaTopics.Create() function may return 501 on create
// Second call might say that topic already exists
// So to be sure, better check it before create
_, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName)

// No error means topic exists
if err == nil {
if isTopicExists(ctx, client, project, serviceName, topicName) {
return diag.Errorf("Topic conflict, already exists: %s", topicName)
}

// If this is not "does not exist", then something happened
if !aiven.IsNotFound(err) {
return diag.FromErr(err)
}

createRequest := aiven.CreateKafkaTopicRequest{
Partitions: &partitions,
Replication: &replication,
Expand All @@ -289,7 +282,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
Tags: getTags(d),
}

err = m.(*aiven.Client).KafkaTopics.Create(
err := client.KafkaTopics.Create(
ctx,
project,
serviceName,
Expand All @@ -300,9 +293,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
}

d.SetId(schemautil.BuildResourceID(project, serviceName, topicName))

// Invalidates cache for the topic
DeleteTopicFromCache(project, serviceName, topicName)
getTopicCache().AddToQueue(project, serviceName, topicName)

// We do not call a Kafka Topic read here to speed up the performance.
// However, in the case of Kafka Topic resource getting a computed field
Expand Down
12 changes: 10 additions & 2 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,8 @@ func (t *kafkaTopicCache) GetMissing(projectName, serviceName string) []string {
return t.missing[projectName+serviceName]
}

// GetQueue retrieves a topics queue, retrieves up to 100 first elements
func (t *kafkaTopicCache) GetQueue(projectName, serviceName string) []string {
// GetQueueTop100 retrieves a topics queue, retrieves up to 100 first elements
func (t *kafkaTopicCache) GetQueueTop100(projectName, serviceName string) []string {
t.RLock()
defer t.RUnlock()

Expand Down Expand Up @@ -203,3 +203,11 @@ func DeleteTopicFromCache(projectName, serviceName, topicName string) {
}
t.Unlock()
}

// GetFullQueue retrieves a topics queue
func (t *kafkaTopicCache) GetQueue(projectName, serviceName string) []string {
t.RLock()
defer t.RUnlock()

return t.inQueue[projectName+serviceName]
}
42 changes: 42 additions & 0 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package kafkatopic

import (
"context"
"log"
"sync"

"github.com/aiven/aiven-go-client/v2"
"golang.org/x/exp/slices"
)

var onceCheckTopicForService sync.Map

// isTopicExists checks if topic exists
func isTopicExists(ctx context.Context, client *aiven.Client, project, serviceName, topic string) bool {
c := getTopicCache()

var err error

// Warming up of the v1 cache should happen only once per service
once, _ := onceCheckTopicForService.LoadOrStore(project+"/"+serviceName, new(sync.Once))
once.(*sync.Once).Do(func() {
var list []*aiven.KafkaListTopic
list, err = client.KafkaTopics.List(ctx, project, serviceName)
if err != nil {
return
}

c.SetV1List(project, serviceName, list)
})

if err != nil {
log.Printf("[ERROR] cannot check kafka topic existence: %s", err)
return false
}

if slices.Contains(c.GetV1List(project, serviceName), topic) {
return true
}

return slices.Contains(c.GetQueue(project, serviceName), topic)
}
3 changes: 2 additions & 1 deletion internal/sdkprovider/service/kafkatopic/kafka_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) {
},
{
// Step 3: recreates the topic
Config: config,
ExpectNonEmptyPlan: true,
Config: config,
Check: resource.ComposeTestCheckFunc(
// Saved in state
resource.TestCheckResourceAttr(kafkaResource, "id", kafkaID),
Expand Down
5 changes: 2 additions & 3 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc {

func (w *kafkaTopicAvailabilityWaiter) refresh() error {
c := getTopicCache()
c.AddToQueue(w.Project, w.ServiceName, w.TopicName)

if !kafkaTopicAvailabilitySem.TryAcquire(1) {
log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...")
Expand All @@ -86,9 +87,7 @@ func (w *kafkaTopicAvailabilityWaiter) refresh() error {
return nil
}

c.AddToQueue(w.Project, w.ServiceName, w.TopicName)

queue := c.GetQueue(w.Project, w.ServiceName)
queue := c.GetQueueTop100(w.Project, w.ServiceName)
if len(queue) == 0 {
return nil
}
Expand Down

0 comments on commit 0448497

Please sign in to comment.