Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(topic): performance for read and creation #1393

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 4 additions & 12 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,11 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
// aiven.KafkaTopics.Create() function may return 501 on create
// Second call might say that topic already exists
// So to be sure, better check it before create
_, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName)

// No error means topic exists
if err == nil {
client := m.(*aiven.Client)
if isTopicExists(ctx, client, project, serviceName, topicName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider adding a comment above this block to briefly explain its purpose for clarity

return diag.Errorf("Topic conflict, already exists: %s", topicName)
}

// If this is not "does not exist", then something happened
if !aiven.IsNotFound(err) {
return diag.FromErr(err)
}

createRequest := aiven.CreateKafkaTopicRequest{
Partitions: &partitions,
Replication: &replication,
Expand All @@ -289,7 +282,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
Tags: getTags(d),
}

err = m.(*aiven.Client).KafkaTopics.Create(
err := client.KafkaTopics.Create(
ctx,
project,
serviceName,
Expand All @@ -301,8 +294,7 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int

d.SetId(schemautil.BuildResourceID(project, serviceName, topicName))

// Invalidates cache for the topic
DeleteTopicFromCache(project, serviceName, topicName)
getTopicCache().AddToQueue(project, serviceName, topicName)

// We do not call a Kafka Topic read here to speed up the performance.
// However, in the case of Kafka Topic resource getting a computed field
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,3 +201,11 @@ func DeleteTopicFromCache(projectName, serviceName, topicName string) {
}
t.Unlock()
}

// GetFullQueue retrieves a topics queue
func (t *kafkaTopicCache) GetFullQueue(projectName, serviceName string) []string {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ensure the method name accurately represents its purpose; If the function retrieves a specific type of queue or has a unique behavior, be more explicit in the name

t.RLock()
defer t.RUnlock()

return t.inQueue[projectName+serviceName]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider handling a scenario where the key might not exist in the inQueue map; directly accessing a non-existent key will return a nil slice, which might be unexpected for the function caller

}
40 changes: 40 additions & 0 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_exists.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package kafkatopic

import (
"context"
"log"
"sync"

"github.com/aiven/aiven-go-client/v2"
"golang.org/x/exp/slices"
)

var onceForService sync.Map
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider renaming the variable to be more descriptive, like onceCheckTopicForService or something similar


// isTopicExists checks if topic exists
func isTopicExists(ctx context.Context, client *aiven.Client, project, serviceName, topic string) bool {
c := getTopicCache()

var err error
once, _ := onceForService.LoadOrStore(project+"/"+serviceName, new(sync.Once))
once.(*sync.Once).Do(func() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add comments for better clarity, especially on the significance of using sync.Once

var list []*aiven.KafkaListTopic
list, err = client.KafkaTopics.List(ctx, project, serviceName)
if err != nil {
return
}

c.SetV1List(project, serviceName, list)
})

if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this approach can be confusing: if possible, consider handling the error within the once.Do block, or refactoring the approach to make the flow clearer

log.Printf("[ERROR] cannot check kafka topic existence: %s", err)
return false
}

if slices.Contains(c.GetV1List(project, serviceName), topic) || slices.Contains(c.GetFullQueue(project, serviceName), topic) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for better performance, if the topic is found in the first c.GetV1List check, there's no need to check in the second c.GetFullQueue call: you can return true immediately

return true
}

return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) {
RefreshState: true,
},
{
ExpectNonEmptyPlan: true,
// Step 3: recreates the topic
Config: config,
Check: resource.ComposeTestCheckFunc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc {

func (w *kafkaTopicAvailabilityWaiter) refresh() error {
c := getTopicCache()
c.AddToQueue(w.Project, w.ServiceName, w.TopicName)

if !kafkaTopicAvailabilitySem.TryAcquire(1) {
log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...")
Expand All @@ -86,8 +87,6 @@ func (w *kafkaTopicAvailabilityWaiter) refresh() error {
return nil
}

c.AddToQueue(w.Project, w.ServiceName, w.TopicName)

queue := c.GetQueue(w.Project, w.ServiceName)
if len(queue) == 0 {
return nil
Expand Down