Skip to content

Commit

Permalink
fix(kafka_topic): improve performance (#1337)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivan-savciuc authored Sep 5, 2023
1 parent 12a6407 commit 31353e1
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ nav_order: 1
- Fix `aiven_transit_gateway_vpc_attachment` resource update
- Fix service IP filters normalization
- Fix improper omitting in `ToAPI`
- Fix Kafka Topic perfomance

## [4.8.1] - 2023-08-23

Expand Down
23 changes: 12 additions & 11 deletions internal/sdkprovider/service/kafkatopic/kafka_topic_wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc {
return func() (interface{}, string, error) {
cache := getTopicCache()

// Caching a list of all topics for a service from v1 GET endpoint.
// Aiven has a request-per-minute limit; therefore, to minimize
// the request count, we query the V1 list endpoint.
if len(cache.GetV1List(w.Project, w.ServiceName)) == 0 {
list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName)
if err != nil {
return nil, "CONFIGURING", fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err)
}
cache.SetV1List(w.Project, w.ServiceName, list)
}

// Checking if the topic is in the missing list. If so, trowing 404 error
if slices.Contains(cache.GetMissing(w.Project, w.ServiceName), w.TopicName) {
return nil, "CONFIGURING", aiven.Error{Status: 404, Message: fmt.Sprintf("Topic %s is not found", w.TopicName)}
Expand Down Expand Up @@ -79,6 +68,7 @@ func (w *kafkaTopicAvailabilityWaiter) RefreshFunc() resource.StateRefreshFunc {
func (w *kafkaTopicAvailabilityWaiter) refresh() error {
if !kafkaTopicAvailabilitySem.TryAcquire(1) {
log.Printf("[TRACE] Kafka Topic Availability cache refresh already in progress ...")
getTopicCache().AddToQueue(w.Project, w.ServiceName, w.TopicName)
return nil
}
defer kafkaTopicAvailabilitySem.Release(1)
Expand All @@ -105,6 +95,17 @@ func (w *kafkaTopicAvailabilityWaiter) refresh() error {
// do not exist but does not say which ones are missing. Therefore, we need to
// identify the none existing topics.
if aiven.IsNotFound(err) {
// Caching a list of all topics for a service from v1 GET endpoint.
// Aiven has a request-per-minute limit; therefore, to minimize
// the request count, we query the V1 list endpoint.
if len(c.GetV1List(w.Project, w.ServiceName)) == 0 {
list, err := w.Client.KafkaTopics.List(w.Project, w.ServiceName)
if err != nil {
return fmt.Errorf("error calling v1 list for %s/%s: %w", w.Project, w.ServiceName, err)
}
c.SetV1List(w.Project, w.ServiceName, list)
}

// If topic is missing in V1 list then it does not exist, flagging it as missing
for _, t := range queue {
if !slices.Contains(c.GetV1List(w.Project, w.ServiceName), t) {
Expand Down

0 comments on commit 31353e1

Please sign in to comment.