Skip to content

Commit

Permalink
feat(kafkatopic): improve performance
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-savciuc committed Sep 11, 2023
1 parent 7e58842 commit 0e4c0ad
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 69 deletions.
22 changes: 2 additions & 20 deletions internal/sdkprovider/service/kafkatopic/kafka_topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,21 +266,6 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
partitions := d.Get("partitions").(int)
replication := d.Get("replication").(int)

// aiven.KafkaTopics.Create() function may return 501 on create
// Second call might say that topic already exists
// So to be sure, better check it before create
_, err := getTopic(ctx, m, d.Timeout(schema.TimeoutRead), project, serviceName, topicName)

// No error means topic exists
if err == nil {
return diag.Errorf("Topic conflict, already exists: %s", topicName)
}

// If this is not "does not exist", then something happened
if !aiven.IsNotFound(err) {
return diag.FromErr(err)
}

createRequest := aiven.CreateKafkaTopicRequest{
Partitions: &partitions,
Replication: &replication,
Expand All @@ -299,16 +284,13 @@ func resourceKafkaTopicCreate(ctx context.Context, d *schema.ResourceData, m int
timeout := d.Timeout(schema.TimeoutCreate)

// nolint:staticcheck // TODO: Migrate to helper/retry package to avoid deprecated WaitForStateContext.
_, err = w.Conf(timeout).WaitForStateContext(ctx)
_, err := w.Conf(timeout).WaitForStateContext(ctx)
if err != nil {
return diag.FromErr(err)
return diag.Errorf("error creating topic: %s", err)
}

d.SetId(schemautil.BuildResourceID(project, serviceName, topicName))

// Invalidates cache for the topic
DeleteTopicFromCache(project, serviceName, topicName)

// We do not call a Kafka Topic read here to speed up the performance.
// However, in the case of Kafka Topic resource getting a computed field
// in the future, a read operation should be called after creation.
Expand Down
22 changes: 21 additions & 1 deletion internal/sdkprovider/service/kafkatopic/kafka_topic_create.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package kafkatopic

import (
"fmt"
"log"
"time"

Expand All @@ -17,6 +18,7 @@ type kafkaTopicCreateWaiter struct {
Project string
ServiceName string
CreateRequest aiven.CreateKafkaTopicRequest
c int
}

// RefreshFunc will call the Aiven client and refresh it's state.
Expand All @@ -25,6 +27,18 @@ func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc {
// Should check if topic does not exist before create
// Assumes it exists, should prove it doesn't by getting no error
return func() (interface{}, string, error) {
defer func() { w.c++ }()

if w.c == 0 {
topic, err := w.Client.KafkaTopics.Get(w.Project, w.ServiceName, w.CreateRequest.TopicName)
if err != nil && !aiven.IsNotFound(err) {
return nil, "", fmt.Errorf("error checking topic creation preconditions: %w", err)
}
if topic != nil {
return nil, "", fmt.Errorf("Topic conflict already exists: %s", w.CreateRequest.TopicName)

Check failure on line 38 in internal/sdkprovider/service/kafkatopic/kafka_topic_create.go

View workflow job for this annotation

GitHub Actions / make_lint

ST1005: error strings should not be capitalized (stylecheck)
}
}

err := w.Client.KafkaTopics.Create(
w.Project,
w.ServiceName,
Expand All @@ -36,7 +50,13 @@ func (w *kafkaTopicCreateWaiter) RefreshFunc() resource.StateRefreshFunc {
// the operation may fail.
aivenError, ok := err.(aiven.Error)
if !ok {
return nil, "", err
return nil, "", fmt.Errorf("cannot create kafka topic: %w", err)
}

// On the first try to create a topic, we should receive the
// `Already exists` error, therefore reporting it.
if w.c == 0 && aiven.IsAlreadyExists(aivenError) {
return nil, "", aivenError
}

if !aiven.IsAlreadyExists(aivenError) {
Expand Down
5 changes: 3 additions & 2 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,8 @@ func TestAccAivenKafkaTopic_recreate_missing(t *testing.T) {
},
{
// Step 3: recreates the topic
Config: config,
Config: config,
ExpectNonEmptyPlan: true,
Check: resource.ComposeTestCheckFunc(
// Saved in state
resource.TestCheckResourceAttr(kafkaResource, "id", kafkaID),
Expand Down Expand Up @@ -529,7 +530,7 @@ func TestAccAivenKafkaTopic_conflicts_if_exists(t *testing.T) {
Steps: []resource.TestStep{
{
Config: testAccAivenKafkaTopicConflictsIfExists(prefix, project),
ExpectError: regexp.MustCompile(`Topic conflict, already exists`),
ExpectError: regexp.MustCompile(`Topic conflict already exists`),
},
},
})
Expand Down
77 changes: 31 additions & 46 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,12 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc {

// Checking if the topic is in the missing list. If so, trowing 404 error
if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) {
return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("Topic %s is not found", w.TopicName)}
return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("topic `%s` is not found", w.TopicName)}
}

topic, ok := cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName)
if !ok {
err := w.refresh()

if err != nil {
return nil, "CONFIGURING", err
}

topic, ok = cache.LoadByTopicName(w.Project, w.ServiceName, w.TopicName)
if !ok {
return nil, "CONFIGURING", nil
}
return nil, "CONFIGURING", w.refresh()
}

log.Printf("[DEBUG] Got `%s` state while waiting for topic `%s` to be up.", topic.State, w.TopicName)
Expand All @@ -66,60 +57,54 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc {
}

func (w *kafkaTopicAvailabilityWaiter) refresh() error {
c := getTopicCache()
c.AddToQueue(w.Project, w.ServiceName, w.TopicName)

if !kafkaTopicAvailabilitySem.TryAcquire(1) {
log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...")
getTopicCache().AddToQueue(w.Project, w.ServiceName, w.TopicName)
return nil
}
defer kafkaTopicAvailabilitySem.Release(1)

c := getTopicCache()

// check if topic is already in cache
if _, ok := c.LoadByTopicName(w.Project, w.ServiceName, w.TopicName); ok {
return nil
}

c.AddToQueue(w.Project, w.ServiceName, w.TopicName)
queue := c.GetQueue(w.Project, w.ServiceName)
if len(queue) == 0 {
return nil
}

for {
queue := c.GetQueue(w.Project, w.ServiceName)
if len(queue) == 0 {
break
}
log.Printf("[DEBUG] kakfa topic queue : %+v", queue)
v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue)
if err != nil {
// V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch
// do not exist but does not say which ones are missing. Therefore, we need to
// identify the none existing topics.
if aiven.IsNotFound(err) {
log.Printf("[DEBUG] v2 list 404 error, queue : %+v, error: %s", queue, err)

log.Printf("[DEBUG] Kafka Topic queue: %+v", queue)
v2Topics, err := w.Client.KafkaTopics.V2List(w.Project, w.ServiceName, queue)
if err != nil {
// V2 Kafka Topic endpoint retrieves 404 when one or more topics in the batch
// do not exist but does not say which ones are missing. Therefore, we need to
// identify the none existing topics.
if aiven.IsNotFound(err) {
// Caching a list of all topics for a service from v1 GET endpoint.
// Aiven has a request-per-minute limit; therefore, to minimize
// the request count, we query the V1 list endpoint.
if len(c.GetV1List(w.Project, w.ServiceName)) == 0 {
list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName)
if err != nil {
return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err)
}
c.SetV1List(w.Project, w.ServiceName, list)
}
list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName)
if err != nil {
return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err)
}
log.Printf("[DEBUG] v1 list results : %+v", list)
c.SetV1List(w.Project, w.ServiceName, list)

// If topic is missing in V1 list then it does not exist, flagging it as missing
for _, t := range queue {
if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) {
c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t)
}
// If topic is missing in V1 list then it does not exist, flagging it as missing
for _, t := range queue {
if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) {
c.DeleteFromQueueAndMarkMissing(w.Project, w.ServiceName, t)
}
return nil
}
return err
return nil
}

getTopicCache().StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics)
return err
}

c.StoreByProjectAndServiceName(w.Project, w.ServiceName, v2Topics)

return nil
}

Expand Down

0 comments on commit 0e4c0ad

Please sign in to comment.